From d10e497d618c9ff3b80e49ddd6b849fd03b4b468 Mon Sep 17 00:00:00 2001 From: MahnoKropotkinvich Date: Tue, 24 Dec 2024 15:15:27 +0800 Subject: [PATCH] now usable, will soon released as v0.1 --- .gitignore | 1 + chan.go | 14 +++++ conn.go | 41 ++++++++---- daemon.go | 5 +- ds.go | 177 ++++++++++++++++++++++++++++++++++++++++++++++++++-- fsm.go | 140 ++++++++++++++++++++--------------------- go.mod | 9 ++- typeconv.go | 19 ++++++ 8 files changed, 315 insertions(+), 91 deletions(-) create mode 100644 chan.go create mode 100644 typeconv.go diff --git a/.gitignore b/.gitignore index e4cd5ab..712c215 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ bin/ go.sum +.vscode/ \ No newline at end of file diff --git a/chan.go b/chan.go new file mode 100644 index 0000000..ef76f9f --- /dev/null +++ b/chan.go @@ -0,0 +1,14 @@ +package main + +type GenericChan chan int +type QueryChan chan MCState +type SignalChan chan MCState + +var ( + CntChan = make(chan CntEvent) + QueryChanChan = make(chan QueryChan) +) +var CriticalSignalChan = make(chan MCState) + +var DaemonChanRX = make(chan struct{}) +var DaemonChanTX = make(chan struct{}) diff --git a/conn.go b/conn.go index f3b5bb4..393dcfd 100644 --- a/conn.go +++ b/conn.go @@ -7,32 +7,43 @@ import ( "time" ) -type timeoutConn struct{ +type timeoutConn struct { conn net.Conn - } -func (c timeoutConn) Read(buf []byte) (int,error) { - c.conn.SetDeadline(time.Now().Add(time.Duration(config.ConnectTimeout)*time.Second)) + +func (c timeoutConn) Read(buf []byte) (int, error) { + c.conn.SetDeadline(time.Now().Add(time.Duration(config.ConnectTimeout) * time.Second)) return c.conn.Read(buf) } -func (c timeoutConn) Write(buf []byte) (int,error) { - c.conn.SetDeadline(time.Now().Add(time.Duration(config.ConnectTimeout)*time.Second)) +func (c timeoutConn) Write(buf []byte) (int, error) { + c.conn.SetDeadline(time.Now().Add(time.Duration(config.ConnectTimeout) * time.Second)) return c.conn.Write(buf) } +type signalChan chan MCState +var signalChanChan = make(chan signalChan) + +// 单条管道->多条管道派发器 goroutine only +var clientSignalChan = NewDynamicMultiChan[MCState](false) + +func bridge() { + for { + msg, _ := <-CriticalSignalChan + clientSignalChan.TX <- msg + } +} func handle(clientOriginal net.Conn) { client := timeoutConn{clientOriginal} defer clientOriginal.Close() queryChan := make(QueryChan) - ChanChan <- queryChan + QueryChanChan <- queryChan proceed := func() { server, err := net.Dial("tcp", "127.0.0.1:25565") if err != nil { GetLogger().Errorf("Failed to connect to MC server: %v", err) return } - // TODO:better way of setting connection timeout defer server.Close() var wg sync.WaitGroup wg.Add(2) @@ -61,17 +72,25 @@ func handle(clientOriginal net.Conn) { proceed() case STOPPED, WAITING: // client.Write([]byte("Server not ready!")) - GetLogger().Warnf("Connection queued, server currently at %s state", stateToStr[state]) + GetLogger().Infof("Connection queued, server currently at %s state", stateToStr[state]) CntChan <- INCREASE defer func() { CntChan <- DECREASE }() - <-RunningChan - proceed() + curChan := make(chan MCState) + clientSignalChan.Add(curChan) + state, _ := <-curChan + if state == RUNNING { + proceed() + } else { + client.Write([]byte("You are too late, server dying!")) + GetLogger().Warnf("Connection refused, server currently at %s state", stateToStr[state]) + } default: client.Write([]byte("Server not ready!")) GetLogger().Warnf("Connection refused, server currently at %s state", stateToStr[state]) } } func Listen() { + go bridge() listener, _ := net.Listen("tcp", "0.0.0.0:"+config.Port) defer listener.Close() GetLogger().Infof("Listening on %s", config.Port) diff --git a/daemon.go b/daemon.go index 2ca336f..fc695ab 100644 --- a/daemon.go +++ b/daemon.go @@ -8,8 +8,6 @@ import ( ) var proc *exec.Cmd -var DaemonChanRX = make(chan struct{}) -var DaemonChanTX = make(chan struct{}) func Stopped() { <-DaemonChanTX @@ -34,7 +32,8 @@ func Running() { DaemonChanRX <- struct{}{} } func Stopping() { - proc.Process.Signal(syscall.SIGTERM) + proc.Process.Signal(syscall.SIGINT) + proc.Wait() go Stopped() DaemonChanRX <- struct{}{} diff --git a/ds.go b/ds.go index 1c643a2..932df9b 100644 --- a/ds.go +++ b/ds.go @@ -1,16 +1,30 @@ package main +import ( + "reflect" + + "golang.org/x/exp/constraints" +) + +type LinearDS[T any] interface { + Push(T) + Pop() T + Peek() T + IsEmpty() bool + Length() int +} + // this DS does not provide thread security +type Stack[T any] struct { + items []T +} + func NewStack[T any]() *Stack[T] { return &Stack[T]{ items: make([]T, 0), } } -type Stack[T any] struct { - items []T -} - func (s *Stack[T]) Push(item T) { s.items = append(s.items, item) } @@ -33,3 +47,158 @@ func (s Stack[T]) Peek() T { func (s Stack[T]) IsEmpty() bool { return len(s.items) == 0 } +func (s Stack[T]) Length() int { + return len(s.items) +} + +type op int + +const ( + ADD op = iota + DELETE +) + +type modify_t[T constraints.Ordered] struct { + op op + id int + ch chan T +} +type DynamicMultiChan[T constraints.Ordered] struct { + TX chan T + RX chan T + // used map[int]bool + // list []chan T + // reloadTX chan struct{} + // reloadRX chan struct{} + // selectCases []reflect.SelectCase + reply bool + // delta *Stack[int] + modify chan modify_t[T] +} + +func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T] { + ret := &DynamicMultiChan[T]{ + TX: make(chan T), + RX: make(chan T), + + reply: reply, + modify: make(chan modify_t[T]), + } + reloadTX := make(chan struct{}) + reloadRX := make(chan struct{}) + used := make(map[int]bool) + unused := make(map[int]bool) + delta := -1 + list := make([]chan T, 0) + selectCases := make([]reflect.SelectCase, 1) + + go func() { + for { + modification, _ := <-ret.modify + op := modification.op + id := modification.id + ch := modification.ch + reloadTX <- struct{}{} + reloadTX <- struct{}{} + <-reloadRX + <-reloadRX // ok it's safe to do operation + switch op { + case ADD: + nelem := reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ch), + } + if len(unused) == 0 { + delta = len(list) + used[len(list)] = true + list = append(list, ch) + selectCases = append(selectCases, nelem) + } else { + var which int + for x := range unused { + which = x + break + } + delta = which + used[which] = true + delete(unused, which) + list[which] = ch + selectCases[which] = nelem + } + case DELETE: + delete(used, id) + unused[id] = true + } + reloadTX <- struct{}{} + reloadTX <- struct{}{} //I'm finished + <-reloadRX + <-reloadRX //ok I'll cleanup + delta = -1 + } + }() + go func() { + selectCases[0] = reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(reloadTX), + } + prevId := -1 + for { + id, recv, ok := reflect.Select(selectCases) + if prevId != -1 { + selectCases[prevId].Dir = reflect.SelectRecv + selectCases[prevId].Send = reflect.Value{} + prevId = -1 + } + if !ok { + // ret.used.Push(id) + // + // delete(used, id) + ret.modify <- modify_t[T]{DELETE, id, nil} + continue + } + if id != 0 { + ret.RX <- To[T](recv) // possible for below to send to Chan? + if reply { + msg, _ := <-ret.TX + selectCases[id].Dir = reflect.SelectSend + selectCases[id].Send = reflect.ValueOf(msg) + prevId = id + } + } else { + reloadRX <- struct{}{} //I'm currently not dealing with other chan + <-reloadTX //waiting for you finished + reloadRX <- struct{}{} //I've read all deltas, you can release them + } + } + }() + go func() { + msgList := make([]T, 0) + for { + select { + case <-reloadTX: + reloadRX <- struct{}{} //I'm currently not dealing with other chan + <-reloadTX //waiting for you finished + if delta != -1 { + for _, x := range msgList { + list[delta] <- x + } + } + reloadRX <- struct{}{} //I've read all deltas, you can release them + case msg, _ := <-ret.TX: + for _, ch := range list { + ch <- msg + } + msgList = append(msgList, msg) + } + } + }() + return ret +} +func (self DynamicMultiChan[T]) IsReply() bool { + return self.reply +} +func (self *DynamicMultiChan[T]) Add(ch chan T) { + self.modify <- modify_t[T]{ADD, -1, ch} +} + +// TODO: we need to wait for 2 msg and send 4 msg in chan diff --git a/fsm.go b/fsm.go index 35f8485..d6dbbf6 100644 --- a/fsm.go +++ b/fsm.go @@ -2,7 +2,6 @@ package main import ( "context" - "reflect" "runtime/debug" "time" ) @@ -15,16 +14,11 @@ const ( ) // luckily I use int for them all -type GenericChan chan int -type QueryChan chan MCState - var ( state MCState = STOPPED // we don't need mutex for cnt because with channel it's guaranteed to be processed sequently - CntChan = make(chan CntEvent) - eventChan = make(chan globalConnEvent) + globalConnEventChan = make(chan globalConnEvent) // the channel used to pass channel to build directional communication - ChanChan = make(chan QueryChan) ) var cnt int @@ -40,7 +34,7 @@ func handleCnt() { //goroutine only, handles cnt related message GetLogger().Debug("handleCnt(): connection cnt increased") cnt++ if cnt == 1 { //which means it is 0->1 - eventChan <- INCOMING + globalConnEventChan <- INCOMING } case DECREASE: GetLogger().Debug("handleCnt(): connection cnt decreased") @@ -49,7 +43,7 @@ func handleCnt() { //goroutine only, handles cnt related message debug.Stack() panic("cnt processor was written wrong!") } else if cnt == 0 { - eventChan <- EMPTY + globalConnEventChan <- EMPTY } } } @@ -83,6 +77,7 @@ func handleWaitingToStopping() { func stoppingThread() { DaemonChanTX <- struct{}{} <-DaemonChanRX + <-DaemonChanRX handleStoppingToStopped() } func handleStoppingToStopped() { @@ -92,15 +87,14 @@ func handleStoppingToStopped() { func bootingThread() { DaemonChanTX <- struct{}{} <-DaemonChanRX + <-DaemonChanRX handleBootingToRunning() } -var RunningChan = make(chan struct{}) - func handleBootingToRunning() { GetLogger().Infof("Server is currently at %s state", stateToStr[RUNNING]) state = RUNNING - RunningChan <- struct{}{} + CriticalSignalChan <- RUNNING } func handleStoppedToBooting() { GetLogger().Infof("Server is currently at %s state", stateToStr[BOOTING]) @@ -121,82 +115,84 @@ func waitingThread() { } } func handleState() { //goroutine only, handles both write and read - unused := NewStack[int]() - const ( - // CMD must come first - CHANCHAN = iota - EVENTCHAN - SIZE - ) + // unused := NewStack[int]() + // const ( + // // CMD must come first + // CHANCHAN = iota + // EVENTCHAN + // SIZE + // ) - selectCases := make([]reflect.SelectCase, 1) + // selectCases := make([]reflect.SelectCase, 1) //selectCases[CHANCHAN] = reflect.SelectCase{ // Dir: reflect.SelectRecv, // readonly - // Chan: reflect.ValueOf(ChanChan), + // Chan: reflect.ValueOf(QueryChanChan), //} //selectCases[EVENTCHAN] = reflect.SelectCase{ // Dir: reflect.SelectRecv, // readonly - // Chan: reflect.ValueOf(eventChan), + // Chan: reflect.ValueOf(globalConnEventChan), //} //selectCases[CMDCHAN] = reflect.SelectCase{ // Dir: reflect.SelectRecv, // Chan: reflect.ValueOf(cmdChan), //} - packagedChan := make(QueryChan) - selectCases[0] = reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(packagedChan), - } - addChan := func(Chan QueryChan) { - logger.Debug("addChan(): adding new chan from chanchan") - nelem := reflect.SelectCase{ - Dir: reflect.SelectRecv, - Chan: reflect.ValueOf(Chan), - } - logger.Debug("addChan(): done") - if unused.IsEmpty() { - selectCases = append(selectCases, nelem) - } else { - selectCases[unused.Pop()] = nelem - } - } - const SIGNAL = 114514 - queryThread := func() { - prevId := -1 - for { - id, recv, ok := reflect.Select(selectCases) - logger.Debugf("queryThread(): recv message from No.%d", id) - if prevId != -1 { - logger.Debug("queryThread(): clearing previous") - selectCases[prevId].Dir = reflect.SelectRecv - selectCases[prevId].Send = reflect.Value{} - prevId = -1 - } - if !ok { - unused.Push(id) - continue - } - if id != 0 { - packagedChan <- MCState(recv.Int()) - state, _ := <-packagedChan - selectCases[id].Dir = reflect.SelectSend - selectCases[id].Send = reflect.ValueOf(state) - prevId = id - } + // packagedChan := make(QueryChan) + // selectCases[0] = reflect.SelectCase{ + // Dir: reflect.SelectRecv, + // Chan: reflect.ValueOf(packagedChan), + // } + // addChan := func(Chan QueryChan) { + // logger.Debug("addChan(): adding new chan from chanchan") + // nelem := reflect.SelectCase{ + // Dir: reflect.SelectRecv, + // Chan: reflect.ValueOf(Chan), + // } + // logger.Debug("addChan(): done") + // if unused.IsEmpty() { + // selectCases = append(selectCases, nelem) + // } else { + // selectCases[unused.Pop()] = nelem + // } + // } + // const SIGNAL = 114514 + // queryThread := func() { + // prevId := -1 + // for { + // id, recv, ok := reflect.Select(selectCases) + // logger.Debugf("queryThread(): recv message from No.%d", id) + // if prevId != -1 { + // logger.Debug("queryThread(): clearing previous") + // selectCases[prevId].Dir = reflect.SelectRecv + // selectCases[prevId].Send = reflect.Value{} + // prevId = -1 + // } + // if !ok { + // unused.Push(id) + // continue + // } + // if id != 0 { + // packagedChan <- MCState(recv.Int()) + // state, _ := <-packagedChan + // selectCases[id].Dir = reflect.SelectSend + // selectCases[id].Send = reflect.ValueOf(state) + // prevId = id + // } - } - } - go queryThread() + // } + // } + // go queryThread() logger.Debug("handleState(): ready") + multiChan := NewDynamicMultiChan[MCState](true) for { select { - case nchan, _ := <-ChanChan: - addChan(nchan) - packagedChan <- SIGNAL + case nchan, _ := <-QueryChanChan: + multiChan.Add(nchan) + //addChan(nchan) + // packagedChan <- SIGNAL //selectCases[id].Dir = reflect.SelectSend //selectCases[id].Send = reflect.ValueOf(make(QueryChan)) //prevId = id - case event, _ := <-eventChan: + case event, _ := <-globalConnEventChan: // wait until transformation finishes switch event { case INCOMING: @@ -219,8 +215,8 @@ func handleState() { //goroutine only, handles both write and read panic("handleState():written wrong!" + stateToStr[state]) } } - case <-packagedChan: - packagedChan <- state + case <-multiChan.RX: + multiChan.TX <- state } } diff --git a/go.mod b/go.mod index 2f0ff75..170c136 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,14 @@ module minimcd go 1.23.4 require ( + github.com/alecthomas/assert/v2 v2.11.0 github.com/sirupsen/logrus v1.9.3 - golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect gopkg.in/yaml.v2 v2.4.0 ) + +require ( + github.com/alecthomas/repr v0.4.0 // indirect + github.com/hexops/gotextdiff v1.0.3 // indirect + golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 + golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect +) diff --git a/typeconv.go b/typeconv.go new file mode 100644 index 0000000..1e508e3 --- /dev/null +++ b/typeconv.go @@ -0,0 +1,19 @@ +package main + +import ( + "reflect" + + "golang.org/x/exp/constraints" +) + +func To[T constraints.Ordered](x reflect.Value) T { + if x.Type() == reflect.TypeOf(*new(T)) { + panic("To(): type does not match") + } + ret, ok := x.Interface().(T) + if !ok { + panic("To(): doesn't work") + } else { + return ret + } +}