move to using contexts to keep channels with connections
This commit is contained in:
parent
30989fd935
commit
1f3c0331e7
341
main.go
341
main.go
@ -1,6 +1,7 @@
|
|||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"flag"
|
"flag"
|
||||||
"log"
|
"log"
|
||||||
"net"
|
"net"
|
||||||
@ -10,8 +11,57 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/xtaci/gaio"
|
"github.com/xtaci/gaio"
|
||||||
|
"versestudios.com/go-telnet-asyncio-test/openconn"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type InChanKey string
|
||||||
|
type OutChanKey string
|
||||||
|
|
||||||
|
func ioHandler(w *gaio.Watcher, WatcherControl *chan string) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case msg := <-*WatcherControl:
|
||||||
|
switch msg {
|
||||||
|
case "":
|
||||||
|
log.Println("empty WatcherControl command")
|
||||||
|
case "stop":
|
||||||
|
log.Println("stopping ioHandler via WatcherControl stop command")
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
log.Println("unknown WatcherControl command:", msg)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
// loop wait for any IO events
|
||||||
|
results, err := w.WaitIO()
|
||||||
|
if err != nil {
|
||||||
|
log.Println(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
IOLoop:
|
||||||
|
for _, res := range results {
|
||||||
|
if res.Context != nil && nil != res.Context.(context.Context) {
|
||||||
|
if inChan, _, ok := openconn.FromContext(res.Context.(context.Context)); ok {
|
||||||
|
switch res.Operation {
|
||||||
|
case gaio.OpRead: // read completion event
|
||||||
|
inChan <- res
|
||||||
|
// queue next read
|
||||||
|
w.Read(res.Context, res.Conn, nil)
|
||||||
|
default: // anything else (meaning write completions)
|
||||||
|
continue IOLoop
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Printf("error getting inChan and outChan from context: %v\n", res.Context)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Printf("nil context! res: %v\n", res)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
port := flag.Int("port", 3333, "Port to accept connections on.")
|
port := flag.Int("port", 3333, "Port to accept connections on.")
|
||||||
host := flag.String("host", "127.0.0.1", "Host or IP to bind to")
|
host := flag.String("host", "127.0.0.1", "Host or IP to bind to")
|
||||||
@ -37,186 +87,173 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
log.Println("new client: ", conn.RemoteAddr())
|
log.Println("new client: ", conn.RemoteAddr())
|
||||||
|
// chan to terminate ioHandler when neccessary
|
||||||
|
WatcherControl := make(chan string)
|
||||||
|
|
||||||
|
// get new context with inChan and outChan for this client connection
|
||||||
|
ctx := openconn.NewContext(context.Background())
|
||||||
|
log.Printf("new context for client %v: %v\n", conn.RemoteAddr(), ctx)
|
||||||
|
|
||||||
// submit the first async write IO request
|
// submit the first async write IO request
|
||||||
err = w.Write(nil, conn, welcomeHandler())
|
err = w.Write(ctx, conn, welcomeHandler())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("err sending welcomeHandler: %v\n", err)
|
log.Printf("err sending welcomeHandler: %v\n", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// hand off channel to menuHandler
|
// now that a prompt is (or will be) displayed, go ahead and listen for input
|
||||||
// menuHandler(w, conn)
|
err = w.Read(ctx, conn, nil)
|
||||||
wg := new(sync.WaitGroup)
|
if err != nil {
|
||||||
|
log.Printf("err queueing w.Read: %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// chan to terminate watcher goroutine when necessary
|
time.Sleep(time.Millisecond)
|
||||||
WatcherControl := make(chan string)
|
|
||||||
|
|
||||||
// watcher.WaitIO goroutine
|
// menu handler for this connection
|
||||||
wg.Add(1)
|
go menuHandler(ctx, conn, w)
|
||||||
go func(WatcherControl *chan string) {
|
// io handler for this connection
|
||||||
log.Println("starting menu watchercontrol")
|
go ioHandler(w, &WatcherControl)
|
||||||
defer wg.Done()
|
|
||||||
ControlLoop:
|
|
||||||
for {
|
|
||||||
log.Println("checking watcher waitio")
|
|
||||||
select {
|
|
||||||
case msg := <-*WatcherControl:
|
|
||||||
if msg == "stop" {
|
|
||||||
log.Println("closing menu watchercontrol")
|
|
||||||
break ControlLoop
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
log.Println("menu watchercontrol waiting for IO")
|
|
||||||
inBuf := make([]byte, 256)
|
|
||||||
err := w.ReadTimeout(nil, conn, inBuf, time.Now().Add(time.Second))
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("err on watcher readtimeout in menu: %v\n", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
results, err := w.WaitIO()
|
|
||||||
if err != nil {
|
|
||||||
log.Println("watcher wait err: ", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Println("menu received IO!")
|
log.Println("main thread looping")
|
||||||
for _, res := range results {
|
// wg.Wait()
|
||||||
if res.Operation == gaio.OpRead && res.Size > 0 {
|
// log.Printf("Goodbye %s!", conn.RemoteAddr())
|
||||||
log.Println("menu receive: ", strings.TrimSpace(string(res.Buffer[:res.Size-2])))
|
|
||||||
switch string(strings.TrimSpace(string(res.Buffer[:res.Size-2]))) {
|
|
||||||
case "welcome":
|
|
||||||
if err := w.Write(nil, conn, welcomeHandler()); err != nil {
|
|
||||||
log.Printf("error sending welcomeHandler from cmd `welcome`: %v", err)
|
|
||||||
}
|
|
||||||
case "adventure":
|
|
||||||
// start the door and wait
|
|
||||||
log.Printf("starting door handler...")
|
|
||||||
/*
|
|
||||||
if err := doorHandler(&c); err != nil {
|
|
||||||
log.Printf("error from cmd `adventure`: %v", err)
|
|
||||||
}
|
|
||||||
log.Printf("returning from door")
|
|
||||||
n, err := c.Write([]byte("\n\n> "))
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("error writing to connection: %v", err)
|
|
||||||
} else {
|
|
||||||
log.Printf("wrote %d byte prompt to connection", n)
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
case "exit":
|
|
||||||
/*
|
|
||||||
if err := exitHandler(); err != nil {
|
|
||||||
log.Printf("error sending exitHandler from cmd `exit`: %v", err)
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
default:
|
|
||||||
err := w.Write(nil, conn, []byte("huh?\n\n> "))
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("error writing to connection: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Println("watcher confirming write:", string(res.Buffer))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
time.Sleep(time.Second)
|
|
||||||
}
|
|
||||||
log.Println("watcher controlloop closed!")
|
|
||||||
}(&WatcherControl)
|
|
||||||
log.Println("main thread waiting on wg")
|
|
||||||
wg.Wait()
|
|
||||||
log.Printf("Goodbye %s!", conn.RemoteAddr())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func menuHandler(w *gaio.Watcher, c net.Conn) {
|
func menuHandler(ctx context.Context, conn net.Conn, w *gaio.Watcher) {
|
||||||
wg := new(sync.WaitGroup)
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
// chan to terminate watcher goroutine when necessary
|
MenuControl := make(chan string)
|
||||||
WatcherControl := make(chan string)
|
|
||||||
|
var inChan, outChan chan gaio.OpResult
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
if inChan, outChan, ok = openconn.FromContext(ctx); !ok {
|
||||||
|
log.Println("Could not get inChan/outChan from context!", ok, ctx)
|
||||||
|
}
|
||||||
|
|
||||||
// watcher.WaitIO goroutine
|
|
||||||
wg.Add(1)
|
wg.Add(1)
|
||||||
go func(WatcherControl *chan string) {
|
go func(inChan, outChan chan gaio.OpResult, MenuControl chan string) {
|
||||||
log.Println("starting menu watchercontrol")
|
|
||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
|
log.Println("starting menu loop")
|
||||||
|
paused := false
|
||||||
ControlLoop:
|
ControlLoop:
|
||||||
for {
|
for {
|
||||||
log.Println("checking watcher waitio")
|
// log.Println("checking watcher waitio")
|
||||||
select {
|
select {
|
||||||
case msg := <-*WatcherControl:
|
case msg := <-MenuControl:
|
||||||
if msg == "stop" {
|
log.Println("msg on MenuControl:", msg)
|
||||||
log.Println("closing menu watchercontrol")
|
switch string(msg) {
|
||||||
|
|
||||||
|
case "stop":
|
||||||
|
log.Println("closing menu loop")
|
||||||
|
paused = true
|
||||||
break ControlLoop
|
break ControlLoop
|
||||||
}
|
|
||||||
default:
|
case "pause":
|
||||||
log.Println("menu watchercontrol waiting for IO")
|
log.Println("pausing menu loop via menucontrol")
|
||||||
results, err := w.WaitIO()
|
paused = true
|
||||||
if err != nil {
|
|
||||||
log.Println(err)
|
case "unpause":
|
||||||
return
|
log.Println("unpausing menu loop via menucontrol")
|
||||||
|
paused = false
|
||||||
|
|
||||||
|
default:
|
||||||
|
log.Println("menu received unknown command: ", msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Println("menu received IO!")
|
default:
|
||||||
for _, res := range results {
|
// log.Println("no msg on MenuControl, continuing")
|
||||||
if res.Operation == gaio.OpRead {
|
if !paused {
|
||||||
log.Println("menu receive: ", string(res.Buffer))
|
// log.Println("menu watchercontrol waiting for IO")
|
||||||
switch string(res.Buffer) {
|
select {
|
||||||
case "":
|
case res := <-inChan:
|
||||||
// noop
|
if res.Error != nil {
|
||||||
case "welcome":
|
log.Println("error on inChan: ", res.Error)
|
||||||
if err := w.Write(nil, c, welcomeHandler()); err != nil {
|
}
|
||||||
log.Printf("error sending welcomeHandler from cmd `welcome`: %v", err)
|
if res.Operation == gaio.OpRead && res.Size > 0 && res.Conn == conn {
|
||||||
}
|
log.Printf("received on inChan: conn: %v, buffer size: %v\n", res.Conn.RemoteAddr(), res.Size)
|
||||||
case "adventure":
|
log.Println("menu receive: ", strings.TrimSpace(string(res.Buffer[:res.Size-2])))
|
||||||
// start the door and wait
|
switch string(strings.TrimSpace(string(res.Buffer[:res.Size-2]))) {
|
||||||
log.Printf("starting door handler...")
|
|
||||||
/*
|
case "welcome":
|
||||||
if err := doorHandler(&c); err != nil {
|
if err := w.Write(ctx, conn, welcomeHandler()); err != nil {
|
||||||
log.Printf("error from cmd `adventure`: %v", err)
|
log.Printf("error sending welcomeHandler from cmd `welcome`: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case "adventure":
|
||||||
|
// start the door and wait
|
||||||
|
log.Printf("starting simulated door handler...")
|
||||||
|
MenuControl <- "pause"
|
||||||
|
// if err := doorHandler(w); err != nil {
|
||||||
|
// log.Printf("error from cmd `adventure`: %v", err)
|
||||||
|
// }
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
log.Printf("returning from door")
|
log.Printf("returning from door")
|
||||||
n, err := c.Write([]byte("\n\n> "))
|
MenuControl <- "unpause"
|
||||||
|
err := w.Write(ctx, conn, []byte("\n\n> "))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("error writing to connection: %v", err)
|
log.Printf("error writing to connection: %v", err)
|
||||||
} else {
|
|
||||||
log.Printf("wrote %d byte prompt to connection", n)
|
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
case "exit":
|
case "exit":
|
||||||
/*
|
if err := w.Write(ctx, conn, exitHandler()); err != nil {
|
||||||
if err := exitHandler(); err != nil {
|
|
||||||
log.Printf("error sending exitHandler from cmd `exit`: %v", err)
|
log.Printf("error sending exitHandler from cmd `exit`: %v", err)
|
||||||
|
} else {
|
||||||
|
// let's wait before we actually close the connection to try and ensure the write gets through
|
||||||
|
time.Sleep(time.Second)
|
||||||
|
log.Println("exiting...")
|
||||||
|
log.Printf("Goodbye %s!", conn.RemoteAddr())
|
||||||
|
err := w.Free(conn)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("err closing connection: %v\n", err)
|
||||||
|
}
|
||||||
|
MenuControl <- "stop"
|
||||||
|
break ControlLoop
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
|
||||||
|
err := w.Write(ctx, conn, []byte("huh?\n\n> "))
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("error writing to connection: %v", err)
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
default:
|
|
||||||
err := w.Write(nil, c, []byte("huh?\n\n> "))
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("error writing to connection: %v", err)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// noop
|
||||||
|
log.Printf("received on inChan: conn: %v, buffer: %v\n", res.Conn.RemoteAddr(), string(res.Buffer))
|
||||||
}
|
}
|
||||||
} else {
|
/*
|
||||||
log.Println("watcher confirming write:", string(res.Buffer))
|
case res := <-outChan:
|
||||||
|
// log.Println("OpResult on outChan:", res)
|
||||||
|
if res.Error != nil {
|
||||||
|
log.Println("error on outChan: ", res.Error)
|
||||||
|
}
|
||||||
|
if res.Operation == gaio.OpWrite && res.Size > 0 && res.Conn == conn {
|
||||||
|
// write confirmation received
|
||||||
|
//log.Printf("received on outChan: conn: %v, buffer: %v\n", res.Conn.RemoteAddr(), string(res.Buffer))
|
||||||
|
} else {
|
||||||
|
// noop
|
||||||
|
log.Printf("watcher sent unknown on outChan: %v\n", res)
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
default:
|
||||||
|
// noop - wait a lil bit
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
if conn == nil {
|
||||||
|
log.Println("conn is nil!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
time.Sleep(time.Second)
|
time.Sleep(time.Millisecond)
|
||||||
}
|
}
|
||||||
log.Println("watcher controlloop closed!")
|
log.Println("menu controlloop closed!")
|
||||||
}(&WatcherControl)
|
}(inChan, outChan, MenuControl)
|
||||||
log.Println("main thread waiting on wg")
|
|
||||||
wg.Wait()
|
|
||||||
log.Printf("Goodbye %s!", c.RemoteAddr())
|
|
||||||
}
|
|
||||||
|
|
||||||
func sendClientText(w *gaio.Watcher, c net.Conn, s string) error {
|
log.Println("menu waiting on waitgroup")
|
||||||
err := w.Write(nil, c, []byte(s))
|
wg.Wait()
|
||||||
if err != nil {
|
log.Println("terminating menucontrol for client")
|
||||||
log.Printf("error writing to connection: %v", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func welcomeHandler() []byte {
|
func welcomeHandler() []byte {
|
||||||
@ -247,10 +284,7 @@ func welcomeHandler() []byte {
|
|||||||
return []byte(bannerText)
|
return []byte(bannerText)
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
func exitHandler() []byte {
|
||||||
|
|
||||||
|
|
||||||
func exitHandler(c net.Conn) error {
|
|
||||||
const exitMessage = "\x1b[2J\x1b[H\x1b[31m\x1b[J\r\n" + `
|
const exitMessage = "\x1b[2J\x1b[H\x1b[31m\x1b[J\r\n" + `
|
||||||
|
|
||||||
___| | |
|
___| | |
|
||||||
@ -261,16 +295,13 @@ func exitHandler(c net.Conn) error {
|
|||||||
|
|
||||||
|
|
||||||
`
|
`
|
||||||
err := sendClientText(c, exitMessage)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("error writing to connection: %v", err)
|
|
||||||
}
|
|
||||||
time.Sleep(500 * time.Millisecond)
|
|
||||||
|
|
||||||
return c.Close()
|
return []byte(exitMessage)
|
||||||
}
|
}
|
||||||
|
|
||||||
func doorHandler(c *net.Conn) error {
|
// rework to mimic mainHandler
|
||||||
|
/*
|
||||||
|
func doorHandler(w *gaio.Watcher) error {
|
||||||
const bannerText = "\r\nCOLOSSAL CAVE\r\n\r\n"
|
const bannerText = "\r\nCOLOSSAL CAVE\r\n\r\n"
|
||||||
err := sendClientText(*c, bannerText)
|
err := sendClientText(*c, bannerText)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
46
openconn/openconn.go
Normal file
46
openconn/openconn.go
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
package openconn
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
|
||||||
|
"github.com/xtaci/gaio"
|
||||||
|
)
|
||||||
|
|
||||||
|
// The key type is unexported to prevent collisions with context keys defined in
|
||||||
|
// other packages.
|
||||||
|
type key int
|
||||||
|
|
||||||
|
const inChanKey key = 0
|
||||||
|
const outChanKey key = 0
|
||||||
|
|
||||||
|
// NewContext returns a new Context carrying two new channels.
|
||||||
|
func NewContext(ctx context.Context) context.Context {
|
||||||
|
inChan := make(chan gaio.OpResult)
|
||||||
|
outChan := make(chan gaio.OpResult)
|
||||||
|
ctx = context.WithValue(ctx, inChanKey, inChan)
|
||||||
|
ctx = context.WithValue(ctx, outChanKey, outChan)
|
||||||
|
return ctx
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromContext extracts the two channels from ctx, if present.
|
||||||
|
func FromContext(ctx context.Context) (chan gaio.OpResult, chan gaio.OpResult, bool) {
|
||||||
|
// ctx.Value returns nil if ctx has no value for the key;
|
||||||
|
// the type assertion returns ok=false for nil.
|
||||||
|
inChan, iok := ctx.Value(inChanKey).(chan gaio.OpResult)
|
||||||
|
outChan, ook := ctx.Value(outChanKey).(chan gaio.OpResult)
|
||||||
|
if iok && ook {
|
||||||
|
return inChan, outChan, iok && ook
|
||||||
|
}
|
||||||
|
return nil, nil, iok && ook
|
||||||
|
}
|
||||||
|
|
||||||
|
// FromOpResult extracts the two channels from a gaio.OpResult, if present.
|
||||||
|
func FromOpResult(res gaio.OpResult) (chan gaio.OpResult, chan gaio.OpResult, bool) {
|
||||||
|
ctx, ok := res.Context.(context.Context)
|
||||||
|
if ok {
|
||||||
|
inChan, iok := ctx.Value(inChanKey).(chan gaio.OpResult)
|
||||||
|
outChan, ook := ctx.Value(outChanKey).(chan gaio.OpResult)
|
||||||
|
return inChan, outChan, iok && ook
|
||||||
|
}
|
||||||
|
return nil, nil, ok
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user