Compare commits

...

5 Commits

3 changed files with 72 additions and 41 deletions

10
conn.go
View File

@ -33,7 +33,7 @@ func bridge() {
clientSignalChan.TX <- msg
}
}
func handle(clientOriginal net.Conn) {
func handleConn(clientOriginal net.Conn) {
client := timeoutConn{clientOriginal}
defer clientOriginal.Close()
queryChan := make(QueryChan)
@ -66,18 +66,16 @@ func handle(clientOriginal net.Conn) {
wg.Wait()
GetLogger().Infof("Connection from %s closed", clientOriginal.RemoteAddr())
}
CntChan <- INCREASE
defer func() { CntChan <- DECREASE }()
queryChan <- STOPPED
st, _ := <-queryChan
switch st {
case RUNNING:
CntChan <- INCREASE
defer func() { CntChan <- DECREASE }()
proceed()
case STOPPED, WAITING:
// client.Write([]byte("Server not ready!"))
GetLogger().Infof("Connection queued, server currently at %s state", stateToStr[state])
CntChan <- INCREASE
defer func() { CntChan <- DECREASE }()
curChan := make(chan MCState)
defer close(curChan)
clientSignalChan.Add(curChan)
@ -106,6 +104,6 @@ func Listen() {
}
//conn.SetReadDeadline(time.Now().Add(time.Duration(config.ConnectTimeout) * time.Second))
GetLogger().Infof("New connection from %s", conn.RemoteAddr())
go handle(conn)
go handleConn(conn)
}
}

99
ds.go
View File

@ -58,6 +58,42 @@ const (
DELETE
)
type Queue[T any] struct {
items []T
}
func NewQueue[T any]() *Queue[T] {
return &Queue[T]{
items: make([]T, 0),
}
}
func (s *Queue[T]) Push(item T) {
s.items = append(s.items, item)
}
func (s *Queue[T]) Pop() T {
l := len(s.items)
if l == 0 {
panic("Stack: pop")
}
ret := s.items[0]
s.items = s.items[1:]
return ret
}
func (s Queue[T]) Peek() T {
l := len(s.items)
if l == 0 {
panic("Stack: peek")
}
return s.items[l]
}
func (s Queue[T]) IsEmpty() bool {
return len(s.items) == 0
}
func (s Queue[T]) Length() int {
return len(s.items)
}
type modify_t[T constraints.Ordered] struct {
op op
id int
@ -79,26 +115,22 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
modify: make(chan modify_t[T]),
}
mode := m
reloadTX := make(chan struct{})
reloadRX := make(chan struct{})
reload := make(chan struct{})
used := make(map[int]bool)
unused := make(map[int]bool)
delta := -1
// delta := -1
list := make([]chan T, 0)
selectCases := make([]reflect.SelectCase, 1)
viewCnt := 0
addQue := NewQueue[int]()
go func() {
for {
modification, _ := <-ret.modify
op := modification.op
id := modification.id
ch := modification.ch
reloadTX <- struct{}{}
if mode == 2 {
reloadTX <- struct{}{}
}
<-reloadRX // ok it's safe to do operation
if mode == 2 {
<-reloadRX
for range viewCnt {
reload <- struct{}{}
}
switch op {
case ADD:
@ -108,7 +140,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
Send: reflect.Value{},
}
if len(unused) == 0 {
delta = len(list)
addQue.Push(len(list))
used[len(list)] = true
list = append(list, ch)
selectCases = append(selectCases, nelem)
@ -118,7 +150,7 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
which = x
break
}
delta = which
addQue.Push(which)
used[which] = true
delete(unused, which)
list[which] = ch
@ -135,21 +167,17 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
}
//deleted <- struct{}{} // <-reloadTX //extra communication
}
reloadTX <- struct{}{}
if mode == 2 {
reloadTX <- struct{}{} //I'm finished
for range viewCnt {
reload <- struct{}{}
}
<-reloadRX
if mode == 2 {
<-reloadRX //ok I'll cleanup
}
delta = -1
}
}()
go func() {
viewCnt++
selectCases[0] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(reloadTX),
Chan: reflect.ValueOf(reload),
}
prevId := -1
for {
@ -164,10 +192,10 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
//
// delete(used, id)
ret.modify <- modify_t[T]{DELETE, id - 1, nil}
<-reloadTX
reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reloadTX //waiting for you finished
reloadRX <- struct{}{} //I've read all deltas, you can release them
<-reload
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reload //waiting for you finished
// reloadRX <- struct{}{} //I've read all deltas, you can release them
continue
}
if mode == 1 && id != 0 {
@ -179,26 +207,31 @@ func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMulti
prevId = id
}
} else if id == 0 {
reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reloadTX //waiting for you finished
reloadRX <- struct{}{} //I've read all deltas, you can release them
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reload //waiting for you finished
// reloadRX <- struct{}{} //I've read all deltas, you can release them
}
}
}()
if mode == 2 {
go func() {
viewCnt++
msgList := make([]T, 0)
for {
select {
case <-reloadTX:
reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reloadTX //waiting for you finished
if delta != -1 {
case <-reload:
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reload //waiting for you finished
if addQue.Length() == 0 {
continue
}
for !addQue.IsEmpty() {
delta := addQue.Pop()
for _, x := range msgList {
list[delta] <- x
}
}
reloadRX <- struct{}{} //I've read all deltas, you can release them
// reloadRX <- struct{}{} //I've read all deltas, you can release them
case msg, _ := <-ret.TX:
for id, ch := range list {
if used[id] {

4
fsm.go
View File

@ -137,7 +137,7 @@ func handleState() { //goroutine only, handles both write and read
handleWaitingToRunning()
//case RUNNING:
default:
panic("handleState():written wrong!" + stateToStr[state])
//panic("handleState():written wrong!" + stateToStr[state])
}
case EMPTY:
switch state {
@ -145,7 +145,7 @@ func handleState() { //goroutine only, handles both write and read
handleRunningToWaiting()
//case BOOTING:
default:
panic("handleState():written wrong!" + stateToStr[state])
//panic("handleState():written wrong!" + stateToStr[state])
}
}
case <-multiChan.RX: