package main import ( "reflect" inf "github.com/Code-Hex/go-infinity-channel" "golang.org/x/exp/constraints" ) type LinearDS[T any] interface { Push(T) Pop() T Peek() T IsEmpty() bool Length() int } // this DS does not provide thread security type Stack[T any] struct { items []T } func NewStack[T any]() *Stack[T] { return &Stack[T]{ items: make([]T, 0), } } func (s *Stack[T]) Push(item T) { s.items = append(s.items, item) } func (s *Stack[T]) Pop() T { l := len(s.items) if l == 0 { panic("Stack: pop") } ret := s.items[l-1] s.items = s.items[:l-1] return ret } func (s Stack[T]) Peek() T { l := len(s.items) if l == 0 { panic("Stack: peek") } return s.items[l] } func (s Stack[T]) IsEmpty() bool { return len(s.items) == 0 } func (s Stack[T]) Length() int { return len(s.items) } type op int const ( ADD op = iota DELETE ) type Queue[T any] struct { items []T } func NewQueue[T any]() *Queue[T] { return &Queue[T]{ items: make([]T, 0), } } func (s *Queue[T]) Push(item T) { s.items = append(s.items, item) } func (s *Queue[T]) Pop() T { l := len(s.items) if l == 0 { panic("Stack: pop") } ret := s.items[0] s.items = s.items[1:] return ret } func (s Queue[T]) Peek() T { l := len(s.items) if l == 0 { panic("Stack: peek") } return s.items[l] } func (s Queue[T]) IsEmpty() bool { return len(s.items) == 0 } func (s Queue[T]) Length() int { return len(s.items) } type modify_t[T constraints.Ordered] struct { op op id int ch chan T } type DynamicMultiChan[T constraints.Ordered] struct { TX chan T RX chan T reply bool modify *inf.Channel[modify_t[T]] } func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMultiChan[T] { ret := &DynamicMultiChan[T]{ TX: make(chan T), RX: make(chan T), reply: reply, modify: inf.NewChannel[modify_t[T]](), } mode := m reload := make([]chan struct{}, 0) used := make(map[int]bool) unused := make(map[int]bool) // delta := -1 list := make([]chan T, 0) selectCases := make([]reflect.SelectCase, 1) viewCnt := 0 addQue := NewQueue[int]() del := make(map[int]bool) go func() { for { modification, _ := <-ret.modify.Out() op := modification.op id := modification.id ch := modification.ch for _, v := range reload { v <- struct{}{} } switch op { case ADD: nelem := reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch), Send: reflect.Value{}, } if len(unused) == 0 { addQue.Push(len(list)) used[len(list)] = true list = append(list, ch) selectCases = append(selectCases, nelem) } else { var which int for x := range unused { which = x break } addQue.Push(which) used[which] = true delete(unused, which) list[which] = ch selectCases[which+1] = nelem } case DELETE: delete(used, id) delete(del, id+1) unused[id] = true list[id] = nil selectCases[id+1] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(NullChan), Send: reflect.Value{}, } //deleted <- struct{}{} // <-reloadTX //extra communication } for _, v := range reload { v <- struct{}{} } } }() { chanId := viewCnt viewCnt++ reload = append(reload, make(chan struct{})) selectCases[0] = reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(reload[chanId]), } prevId := -1 go func() { for { id, recv, ok := reflect.Select(selectCases) if prevId != -1 { selectCases[prevId].Dir = reflect.SelectRecv selectCases[prevId].Send = reflect.Value{} prevId = -1 } if !ok { // ret.used.Push(id) // // delete(used, id) if _, Ok := del[id]; !Ok { del[id] = true ret.modify.In() <- modify_t[T]{DELETE, id - 1, nil} <-reload[chanId] <-reload[chanId] } // reloadRX <- struct{}{} //I'm currently not dealing with other chan //waiting for you finished // reloadRX <- struct{}{} //I've read all deltas, you can release them continue } if mode == 1 && id != 0 { ret.RX <- To[T](recv) // possible for below to send to Chan? if ret.reply { msg, _ := <-ret.TX selectCases[id].Dir = reflect.SelectSend selectCases[id].Send = reflect.ValueOf(msg) prevId = id } } else if id == 0 { // reloadRX <- struct{}{} //I'm currently not dealing with other chan <-reload[chanId] //waiting for you finished // reloadRX <- struct{}{} //I've read all deltas, you can release them } } }() } if mode == 2 { chanId := viewCnt viewCnt++ reload = append(reload, make(chan struct{})) msgList := make([]T, 0) go func() { for { select { case <-reload[chanId]: // reloadRX <- struct{}{} //I'm currently not dealing with other chan <-reload[chanId] //waiting for you finished for !addQue.IsEmpty() { delta := addQue.Pop() for _, x := range msgList { list[delta] <- x } } // reloadRX <- struct{}{} //I've read all deltas, you can release them case msg, _ := <-ret.TX: for id, ch := range list { if used[id] { ch <- msg } } msgList = append(msgList, msg) } } }() } return ret } func (self DynamicMultiChan[T]) IsReply() bool { return self.reply } func (self *DynamicMultiChan[T]) Add(ch chan T) { self.modify.In() <- modify_t[T]{ADD, -1, ch} } // TODO: we need to wait for 2 msg and send 4 msg in chan