267 lines
5.4 KiB
Go
267 lines
5.4 KiB
Go
package main
|
|
|
|
import (
|
|
"reflect"
|
|
|
|
inf "github.com/Code-Hex/go-infinity-channel"
|
|
"golang.org/x/exp/constraints"
|
|
)
|
|
|
|
type LinearDS[T any] interface {
|
|
Push(T)
|
|
Pop() T
|
|
Peek() T
|
|
IsEmpty() bool
|
|
Length() int
|
|
}
|
|
|
|
// this DS does not provide thread security
|
|
type Stack[T any] struct {
|
|
items []T
|
|
}
|
|
|
|
func NewStack[T any]() *Stack[T] {
|
|
return &Stack[T]{
|
|
items: make([]T, 0),
|
|
}
|
|
}
|
|
|
|
func (s *Stack[T]) Push(item T) {
|
|
s.items = append(s.items, item)
|
|
}
|
|
func (s *Stack[T]) Pop() T {
|
|
l := len(s.items)
|
|
if l == 0 {
|
|
panic("Stack: pop")
|
|
}
|
|
ret := s.items[l-1]
|
|
s.items = s.items[:l-1]
|
|
return ret
|
|
}
|
|
func (s Stack[T]) Peek() T {
|
|
l := len(s.items)
|
|
if l == 0 {
|
|
panic("Stack: peek")
|
|
}
|
|
return s.items[l]
|
|
}
|
|
func (s Stack[T]) IsEmpty() bool {
|
|
return len(s.items) == 0
|
|
}
|
|
func (s Stack[T]) Length() int {
|
|
return len(s.items)
|
|
}
|
|
|
|
type op int
|
|
|
|
const (
|
|
ADD op = iota
|
|
DELETE
|
|
)
|
|
|
|
type Queue[T any] struct {
|
|
items []T
|
|
}
|
|
|
|
func NewQueue[T any]() *Queue[T] {
|
|
return &Queue[T]{
|
|
items: make([]T, 0),
|
|
}
|
|
}
|
|
|
|
func (s *Queue[T]) Push(item T) {
|
|
s.items = append(s.items, item)
|
|
}
|
|
func (s *Queue[T]) Pop() T {
|
|
l := len(s.items)
|
|
if l == 0 {
|
|
panic("Stack: pop")
|
|
}
|
|
ret := s.items[0]
|
|
s.items = s.items[1:]
|
|
return ret
|
|
}
|
|
func (s Queue[T]) Peek() T {
|
|
l := len(s.items)
|
|
if l == 0 {
|
|
panic("Stack: peek")
|
|
}
|
|
return s.items[l]
|
|
}
|
|
func (s Queue[T]) IsEmpty() bool {
|
|
return len(s.items) == 0
|
|
}
|
|
func (s Queue[T]) Length() int {
|
|
return len(s.items)
|
|
}
|
|
|
|
type modify_t[T constraints.Ordered] struct {
|
|
op op
|
|
id int
|
|
ch chan T
|
|
}
|
|
type DynamicMultiChan[T constraints.Ordered] struct {
|
|
TX chan T
|
|
RX chan T
|
|
reply bool
|
|
modify *inf.Channel[modify_t[T]]
|
|
}
|
|
|
|
func NewDynamicMultiChan[T constraints.Ordered](reply bool, m int) *DynamicMultiChan[T] {
|
|
ret := &DynamicMultiChan[T]{
|
|
TX: make(chan T),
|
|
RX: make(chan T),
|
|
|
|
reply: reply,
|
|
modify: inf.NewChannel[modify_t[T]](),
|
|
}
|
|
mode := m
|
|
reload := make([]chan struct{}, 0)
|
|
used := make(map[int]bool)
|
|
unused := make(map[int]bool)
|
|
// delta := -1
|
|
list := make([]chan T, 0)
|
|
selectCases := make([]reflect.SelectCase, 1)
|
|
viewCnt := 0
|
|
addQue := NewQueue[int]()
|
|
|
|
del := make(map[int]bool)
|
|
go func() {
|
|
for {
|
|
modification, _ := <-ret.modify.Out()
|
|
op := modification.op
|
|
id := modification.id
|
|
ch := modification.ch
|
|
for _, v := range reload {
|
|
v <- struct{}{}
|
|
}
|
|
switch op {
|
|
case ADD:
|
|
nelem := reflect.SelectCase{
|
|
Dir: reflect.SelectRecv,
|
|
Chan: reflect.ValueOf(ch),
|
|
Send: reflect.Value{},
|
|
}
|
|
if len(unused) == 0 {
|
|
addQue.Push(len(list))
|
|
used[len(list)] = true
|
|
list = append(list, ch)
|
|
selectCases = append(selectCases, nelem)
|
|
} else {
|
|
var which int
|
|
for x := range unused {
|
|
which = x
|
|
break
|
|
}
|
|
addQue.Push(which)
|
|
used[which] = true
|
|
delete(unused, which)
|
|
list[which] = ch
|
|
selectCases[which+1] = nelem
|
|
}
|
|
case DELETE:
|
|
delete(used, id)
|
|
delete(del, id+1)
|
|
unused[id] = true
|
|
list[id] = nil
|
|
selectCases[id+1] = reflect.SelectCase{
|
|
Dir: reflect.SelectRecv,
|
|
Chan: reflect.ValueOf(NullChan),
|
|
Send: reflect.Value{},
|
|
}
|
|
//deleted <- struct{}{} // <-reloadTX //extra communication
|
|
}
|
|
for _, v := range reload {
|
|
v <- struct{}{}
|
|
}
|
|
|
|
}
|
|
}()
|
|
{
|
|
chanId := viewCnt
|
|
viewCnt++
|
|
reload = append(reload, make(chan struct{}))
|
|
selectCases[0] = reflect.SelectCase{
|
|
Dir: reflect.SelectRecv,
|
|
Chan: reflect.ValueOf(reload[chanId]),
|
|
}
|
|
prevId := -1
|
|
go func() {
|
|
for {
|
|
id, recv, ok := reflect.Select(selectCases)
|
|
if prevId != -1 {
|
|
selectCases[prevId].Dir = reflect.SelectRecv
|
|
selectCases[prevId].Send = reflect.Value{}
|
|
prevId = -1
|
|
}
|
|
if !ok {
|
|
// ret.used.Push(id)
|
|
//
|
|
// delete(used, id)
|
|
if _, Ok := del[id]; !Ok {
|
|
del[id] = true
|
|
ret.modify.In() <- modify_t[T]{DELETE, id - 1, nil}
|
|
<-reload[chanId]
|
|
<-reload[chanId]
|
|
}
|
|
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
|
|
//waiting for you finished
|
|
// reloadRX <- struct{}{} //I've read all deltas, you can release them
|
|
continue
|
|
}
|
|
if mode == 1 && id != 0 {
|
|
ret.RX <- To[T](recv) // possible for below to send to Chan?
|
|
if ret.reply {
|
|
msg, _ := <-ret.TX
|
|
selectCases[id].Dir = reflect.SelectSend
|
|
selectCases[id].Send = reflect.ValueOf(msg)
|
|
prevId = id
|
|
}
|
|
} else if id == 0 {
|
|
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
|
|
<-reload[chanId] //waiting for you finished
|
|
// reloadRX <- struct{}{} //I've read all deltas, you can release them
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
if mode == 2 {
|
|
chanId := viewCnt
|
|
viewCnt++
|
|
reload = append(reload, make(chan struct{}))
|
|
msgList := make([]T, 0)
|
|
go func() {
|
|
for {
|
|
select {
|
|
case <-reload[chanId]:
|
|
// reloadRX <- struct{}{} //I'm currently not dealing with other chan
|
|
<-reload[chanId] //waiting for you finished
|
|
for !addQue.IsEmpty() {
|
|
delta := addQue.Pop()
|
|
for _, x := range msgList {
|
|
list[delta] <- x
|
|
}
|
|
}
|
|
// reloadRX <- struct{}{} //I've read all deltas, you can release them
|
|
case msg, _ := <-ret.TX:
|
|
for id, ch := range list {
|
|
if used[id] {
|
|
ch <- msg
|
|
}
|
|
}
|
|
msgList = append(msgList, msg)
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
return ret
|
|
}
|
|
func (self DynamicMultiChan[T]) IsReply() bool {
|
|
return self.reply
|
|
}
|
|
func (self *DynamicMultiChan[T]) Add(ch chan T) {
|
|
self.modify.In() <- modify_t[T]{ADD, -1, ch}
|
|
}
|
|
|
|
// TODO: we need to wait for 2 msg and send 4 msg in chan
|