now usable, will soon released as v0.1

This commit is contained in:
MahnoKropotkinvich 2024-12-24 15:15:27 +08:00
parent d1b1ca8dc0
commit d10e497d61
8 changed files with 315 additions and 91 deletions

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
bin/
go.sum
.vscode/

14
chan.go Normal file
View File

@ -0,0 +1,14 @@
package main
type GenericChan chan int
type QueryChan chan MCState
type SignalChan chan MCState
var (
CntChan = make(chan CntEvent)
QueryChanChan = make(chan QueryChan)
)
var CriticalSignalChan = make(chan MCState)
var DaemonChanRX = make(chan struct{})
var DaemonChanTX = make(chan struct{})

41
conn.go
View File

@ -7,32 +7,43 @@ import (
"time"
)
type timeoutConn struct{
type timeoutConn struct {
conn net.Conn
}
func (c timeoutConn) Read(buf []byte) (int,error) {
c.conn.SetDeadline(time.Now().Add(time.Duration(config.ConnectTimeout)*time.Second))
func (c timeoutConn) Read(buf []byte) (int, error) {
c.conn.SetDeadline(time.Now().Add(time.Duration(config.ConnectTimeout) * time.Second))
return c.conn.Read(buf)
}
func (c timeoutConn) Write(buf []byte) (int,error) {
c.conn.SetDeadline(time.Now().Add(time.Duration(config.ConnectTimeout)*time.Second))
func (c timeoutConn) Write(buf []byte) (int, error) {
c.conn.SetDeadline(time.Now().Add(time.Duration(config.ConnectTimeout) * time.Second))
return c.conn.Write(buf)
}
type signalChan chan MCState
var signalChanChan = make(chan signalChan)
// 单条管道->多条管道派发器 goroutine only
var clientSignalChan = NewDynamicMultiChan[MCState](false)
func bridge() {
for {
msg, _ := <-CriticalSignalChan
clientSignalChan.TX <- msg
}
}
func handle(clientOriginal net.Conn) {
client := timeoutConn{clientOriginal}
defer clientOriginal.Close()
queryChan := make(QueryChan)
ChanChan <- queryChan
QueryChanChan <- queryChan
proceed := func() {
server, err := net.Dial("tcp", "127.0.0.1:25565")
if err != nil {
GetLogger().Errorf("Failed to connect to MC server: %v", err)
return
}
// TODO:better way of setting connection timeout
defer server.Close()
var wg sync.WaitGroup
wg.Add(2)
@ -61,17 +72,25 @@ func handle(clientOriginal net.Conn) {
proceed()
case STOPPED, WAITING:
// client.Write([]byte("Server not ready!"))
GetLogger().Warnf("Connection queued, server currently at %s state", stateToStr[state])
GetLogger().Infof("Connection queued, server currently at %s state", stateToStr[state])
CntChan <- INCREASE
defer func() { CntChan <- DECREASE }()
<-RunningChan
proceed()
curChan := make(chan MCState)
clientSignalChan.Add(curChan)
state, _ := <-curChan
if state == RUNNING {
proceed()
} else {
client.Write([]byte("You are too late, server dying!"))
GetLogger().Warnf("Connection refused, server currently at %s state", stateToStr[state])
}
default:
client.Write([]byte("Server not ready!"))
GetLogger().Warnf("Connection refused, server currently at %s state", stateToStr[state])
}
}
func Listen() {
go bridge()
listener, _ := net.Listen("tcp", "0.0.0.0:"+config.Port)
defer listener.Close()
GetLogger().Infof("Listening on %s", config.Port)

View File

@ -8,8 +8,6 @@ import (
)
var proc *exec.Cmd
var DaemonChanRX = make(chan struct{})
var DaemonChanTX = make(chan struct{})
func Stopped() {
<-DaemonChanTX
@ -34,7 +32,8 @@ func Running() {
DaemonChanRX <- struct{}{}
}
func Stopping() {
proc.Process.Signal(syscall.SIGTERM)
proc.Process.Signal(syscall.SIGINT)
proc.Wait()
go Stopped()
DaemonChanRX <- struct{}{}

177
ds.go
View File

@ -1,16 +1,30 @@
package main
import (
"reflect"
"golang.org/x/exp/constraints"
)
type LinearDS[T any] interface {
Push(T)
Pop() T
Peek() T
IsEmpty() bool
Length() int
}
// this DS does not provide thread security
type Stack[T any] struct {
items []T
}
func NewStack[T any]() *Stack[T] {
return &Stack[T]{
items: make([]T, 0),
}
}
type Stack[T any] struct {
items []T
}
func (s *Stack[T]) Push(item T) {
s.items = append(s.items, item)
}
@ -33,3 +47,158 @@ func (s Stack[T]) Peek() T {
func (s Stack[T]) IsEmpty() bool {
return len(s.items) == 0
}
func (s Stack[T]) Length() int {
return len(s.items)
}
type op int
const (
ADD op = iota
DELETE
)
type modify_t[T constraints.Ordered] struct {
op op
id int
ch chan T
}
type DynamicMultiChan[T constraints.Ordered] struct {
TX chan T
RX chan T
// used map[int]bool
// list []chan T
// reloadTX chan struct{}
// reloadRX chan struct{}
// selectCases []reflect.SelectCase
reply bool
// delta *Stack[int]
modify chan modify_t[T]
}
func NewDynamicMultiChan[T constraints.Ordered](reply bool) *DynamicMultiChan[T] {
ret := &DynamicMultiChan[T]{
TX: make(chan T),
RX: make(chan T),
reply: reply,
modify: make(chan modify_t[T]),
}
reloadTX := make(chan struct{})
reloadRX := make(chan struct{})
used := make(map[int]bool)
unused := make(map[int]bool)
delta := -1
list := make([]chan T, 0)
selectCases := make([]reflect.SelectCase, 1)
go func() {
for {
modification, _ := <-ret.modify
op := modification.op
id := modification.id
ch := modification.ch
reloadTX <- struct{}{}
reloadTX <- struct{}{}
<-reloadRX
<-reloadRX // ok it's safe to do operation
switch op {
case ADD:
nelem := reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
}
if len(unused) == 0 {
delta = len(list)
used[len(list)] = true
list = append(list, ch)
selectCases = append(selectCases, nelem)
} else {
var which int
for x := range unused {
which = x
break
}
delta = which
used[which] = true
delete(unused, which)
list[which] = ch
selectCases[which] = nelem
}
case DELETE:
delete(used, id)
unused[id] = true
}
reloadTX <- struct{}{}
reloadTX <- struct{}{} //I'm finished
<-reloadRX
<-reloadRX //ok I'll cleanup
delta = -1
}
}()
go func() {
selectCases[0] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(reloadTX),
}
prevId := -1
for {
id, recv, ok := reflect.Select(selectCases)
if prevId != -1 {
selectCases[prevId].Dir = reflect.SelectRecv
selectCases[prevId].Send = reflect.Value{}
prevId = -1
}
if !ok {
// ret.used.Push(id)
//
// delete(used, id)
ret.modify <- modify_t[T]{DELETE, id, nil}
continue
}
if id != 0 {
ret.RX <- To[T](recv) // possible for below to send to Chan?
if reply {
msg, _ := <-ret.TX
selectCases[id].Dir = reflect.SelectSend
selectCases[id].Send = reflect.ValueOf(msg)
prevId = id
}
} else {
reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reloadTX //waiting for you finished
reloadRX <- struct{}{} //I've read all deltas, you can release them
}
}
}()
go func() {
msgList := make([]T, 0)
for {
select {
case <-reloadTX:
reloadRX <- struct{}{} //I'm currently not dealing with other chan
<-reloadTX //waiting for you finished
if delta != -1 {
for _, x := range msgList {
list[delta] <- x
}
}
reloadRX <- struct{}{} //I've read all deltas, you can release them
case msg, _ := <-ret.TX:
for _, ch := range list {
ch <- msg
}
msgList = append(msgList, msg)
}
}
}()
return ret
}
func (self DynamicMultiChan[T]) IsReply() bool {
return self.reply
}
func (self *DynamicMultiChan[T]) Add(ch chan T) {
self.modify <- modify_t[T]{ADD, -1, ch}
}
// TODO: we need to wait for 2 msg and send 4 msg in chan

140
fsm.go
View File

@ -2,7 +2,6 @@ package main
import (
"context"
"reflect"
"runtime/debug"
"time"
)
@ -15,16 +14,11 @@ const (
)
// luckily I use int for them all
type GenericChan chan int
type QueryChan chan MCState
var (
state MCState = STOPPED
// we don't need mutex for cnt because with channel it's guaranteed to be processed sequently
CntChan = make(chan CntEvent)
eventChan = make(chan globalConnEvent)
globalConnEventChan = make(chan globalConnEvent)
// the channel used to pass channel to build directional communication
ChanChan = make(chan QueryChan)
)
var cnt int
@ -40,7 +34,7 @@ func handleCnt() { //goroutine only, handles cnt related message
GetLogger().Debug("handleCnt(): connection cnt increased")
cnt++
if cnt == 1 { //which means it is 0->1
eventChan <- INCOMING
globalConnEventChan <- INCOMING
}
case DECREASE:
GetLogger().Debug("handleCnt(): connection cnt decreased")
@ -49,7 +43,7 @@ func handleCnt() { //goroutine only, handles cnt related message
debug.Stack()
panic("cnt processor was written wrong!")
} else if cnt == 0 {
eventChan <- EMPTY
globalConnEventChan <- EMPTY
}
}
}
@ -83,6 +77,7 @@ func handleWaitingToStopping() {
func stoppingThread() {
DaemonChanTX <- struct{}{}
<-DaemonChanRX
<-DaemonChanRX
handleStoppingToStopped()
}
func handleStoppingToStopped() {
@ -92,15 +87,14 @@ func handleStoppingToStopped() {
func bootingThread() {
DaemonChanTX <- struct{}{}
<-DaemonChanRX
<-DaemonChanRX
handleBootingToRunning()
}
var RunningChan = make(chan struct{})
func handleBootingToRunning() {
GetLogger().Infof("Server is currently at %s state", stateToStr[RUNNING])
state = RUNNING
RunningChan <- struct{}{}
CriticalSignalChan <- RUNNING
}
func handleStoppedToBooting() {
GetLogger().Infof("Server is currently at %s state", stateToStr[BOOTING])
@ -121,82 +115,84 @@ func waitingThread() {
}
}
func handleState() { //goroutine only, handles both write and read
unused := NewStack[int]()
const (
// CMD must come first
CHANCHAN = iota
EVENTCHAN
SIZE
)
// unused := NewStack[int]()
// const (
// // CMD must come first
// CHANCHAN = iota
// EVENTCHAN
// SIZE
// )
selectCases := make([]reflect.SelectCase, 1)
// selectCases := make([]reflect.SelectCase, 1)
//selectCases[CHANCHAN] = reflect.SelectCase{
// Dir: reflect.SelectRecv, // readonly
// Chan: reflect.ValueOf(ChanChan),
// Chan: reflect.ValueOf(QueryChanChan),
//}
//selectCases[EVENTCHAN] = reflect.SelectCase{
// Dir: reflect.SelectRecv, // readonly
// Chan: reflect.ValueOf(eventChan),
// Chan: reflect.ValueOf(globalConnEventChan),
//}
//selectCases[CMDCHAN] = reflect.SelectCase{
// Dir: reflect.SelectRecv,
// Chan: reflect.ValueOf(cmdChan),
//}
packagedChan := make(QueryChan)
selectCases[0] = reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(packagedChan),
}
addChan := func(Chan QueryChan) {
logger.Debug("addChan(): adding new chan from chanchan")
nelem := reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(Chan),
}
logger.Debug("addChan(): done")
if unused.IsEmpty() {
selectCases = append(selectCases, nelem)
} else {
selectCases[unused.Pop()] = nelem
}
}
const SIGNAL = 114514
queryThread := func() {
prevId := -1
for {
id, recv, ok := reflect.Select(selectCases)
logger.Debugf("queryThread(): recv message from No.%d", id)
if prevId != -1 {
logger.Debug("queryThread(): clearing previous")
selectCases[prevId].Dir = reflect.SelectRecv
selectCases[prevId].Send = reflect.Value{}
prevId = -1
}
if !ok {
unused.Push(id)
continue
}
if id != 0 {
packagedChan <- MCState(recv.Int())
state, _ := <-packagedChan
selectCases[id].Dir = reflect.SelectSend
selectCases[id].Send = reflect.ValueOf(state)
prevId = id
}
// packagedChan := make(QueryChan)
// selectCases[0] = reflect.SelectCase{
// Dir: reflect.SelectRecv,
// Chan: reflect.ValueOf(packagedChan),
// }
// addChan := func(Chan QueryChan) {
// logger.Debug("addChan(): adding new chan from chanchan")
// nelem := reflect.SelectCase{
// Dir: reflect.SelectRecv,
// Chan: reflect.ValueOf(Chan),
// }
// logger.Debug("addChan(): done")
// if unused.IsEmpty() {
// selectCases = append(selectCases, nelem)
// } else {
// selectCases[unused.Pop()] = nelem
// }
// }
// const SIGNAL = 114514
// queryThread := func() {
// prevId := -1
// for {
// id, recv, ok := reflect.Select(selectCases)
// logger.Debugf("queryThread(): recv message from No.%d", id)
// if prevId != -1 {
// logger.Debug("queryThread(): clearing previous")
// selectCases[prevId].Dir = reflect.SelectRecv
// selectCases[prevId].Send = reflect.Value{}
// prevId = -1
// }
// if !ok {
// unused.Push(id)
// continue
// }
// if id != 0 {
// packagedChan <- MCState(recv.Int())
// state, _ := <-packagedChan
// selectCases[id].Dir = reflect.SelectSend
// selectCases[id].Send = reflect.ValueOf(state)
// prevId = id
// }
}
}
go queryThread()
// }
// }
// go queryThread()
logger.Debug("handleState(): ready")
multiChan := NewDynamicMultiChan[MCState](true)
for {
select {
case nchan, _ := <-ChanChan:
addChan(nchan)
packagedChan <- SIGNAL
case nchan, _ := <-QueryChanChan:
multiChan.Add(nchan)
//addChan(nchan)
// packagedChan <- SIGNAL
//selectCases[id].Dir = reflect.SelectSend
//selectCases[id].Send = reflect.ValueOf(make(QueryChan))
//prevId = id
case event, _ := <-eventChan:
case event, _ := <-globalConnEventChan:
// wait until transformation finishes
switch event {
case INCOMING:
@ -219,8 +215,8 @@ func handleState() { //goroutine only, handles both write and read
panic("handleState():written wrong!" + stateToStr[state])
}
}
case <-packagedChan:
packagedChan <- state
case <-multiChan.RX:
multiChan.TX <- state
}
}

9
go.mod
View File

@ -3,7 +3,14 @@ module minimcd
go 1.23.4
require (
github.com/alecthomas/assert/v2 v2.11.0
github.com/sirupsen/logrus v1.9.3
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
gopkg.in/yaml.v2 v2.4.0
)
require (
github.com/alecthomas/repr v0.4.0 // indirect
github.com/hexops/gotextdiff v1.0.3 // indirect
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8 // indirect
)

19
typeconv.go Normal file
View File

@ -0,0 +1,19 @@
package main
import (
"reflect"
"golang.org/x/exp/constraints"
)
func To[T constraints.Ordered](x reflect.Value) T {
if x.Type() == reflect.TypeOf(*new(T)) {
panic("To(): type does not match")
}
ret, ok := x.Interface().(T)
if !ok {
panic("To(): doesn't work")
} else {
return ret
}
}