diff --git a/main.go b/main.go index db71819..b2beb2f 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "log" "net" @@ -10,8 +11,57 @@ import ( "time" "github.com/xtaci/gaio" + "versestudios.com/go-telnet-asyncio-test/openconn" ) +type InChanKey string +type OutChanKey string + +func ioHandler(w *gaio.Watcher, WatcherControl *chan string) { + for { + select { + case msg := <-*WatcherControl: + switch msg { + case "": + log.Println("empty WatcherControl command") + case "stop": + log.Println("stopping ioHandler via WatcherControl stop command") + return + default: + log.Println("unknown WatcherControl command:", msg) + } + default: + // loop wait for any IO events + results, err := w.WaitIO() + if err != nil { + log.Println(err) + return + } + + IOLoop: + for _, res := range results { + if res.Context != nil && nil != res.Context.(context.Context) { + if inChan, _, ok := openconn.FromContext(res.Context.(context.Context)); ok { + switch res.Operation { + case gaio.OpRead: // read completion event + inChan <- res + // queue next read + w.Read(res.Context, res.Conn, nil) + default: // anything else (meaning write completions) + continue IOLoop + } + } else { + log.Printf("error getting inChan and outChan from context: %v\n", res.Context) + } + } else { + log.Printf("nil context! res: %v\n", res) + } + } + } + time.Sleep(time.Millisecond) + } +} + func main() { port := flag.Int("port", 3333, "Port to accept connections on.") host := flag.String("host", "127.0.0.1", "Host or IP to bind to") @@ -37,186 +87,173 @@ func main() { } log.Println("new client: ", conn.RemoteAddr()) + // chan to terminate ioHandler when neccessary + WatcherControl := make(chan string) + + // get new context with inChan and outChan for this client connection + ctx := openconn.NewContext(context.Background()) + log.Printf("new context for client %v: %v\n", conn.RemoteAddr(), ctx) // submit the first async write IO request - err = w.Write(nil, conn, welcomeHandler()) + err = w.Write(ctx, conn, welcomeHandler()) if err != nil { log.Printf("err sending welcomeHandler: %v\n", err) return } - // hand off channel to menuHandler - // menuHandler(w, conn) - wg := new(sync.WaitGroup) + // now that a prompt is (or will be) displayed, go ahead and listen for input + err = w.Read(ctx, conn, nil) + if err != nil { + log.Printf("err queueing w.Read: %v\n", err) + return + } - // chan to terminate watcher goroutine when necessary - WatcherControl := make(chan string) + time.Sleep(time.Millisecond) - // watcher.WaitIO goroutine - wg.Add(1) - go func(WatcherControl *chan string) { - log.Println("starting menu watchercontrol") - defer wg.Done() - ControlLoop: - for { - log.Println("checking watcher waitio") - select { - case msg := <-*WatcherControl: - if msg == "stop" { - log.Println("closing menu watchercontrol") - break ControlLoop - } - default: - log.Println("menu watchercontrol waiting for IO") - inBuf := make([]byte, 256) - err := w.ReadTimeout(nil, conn, inBuf, time.Now().Add(time.Second)) - if err != nil { - log.Printf("err on watcher readtimeout in menu: %v\n", err) - continue - } - results, err := w.WaitIO() - if err != nil { - log.Println("watcher wait err: ", err) - continue - } + // menu handler for this connection + go menuHandler(ctx, conn, w) + // io handler for this connection + go ioHandler(w, &WatcherControl) - log.Println("menu received IO!") - for _, res := range results { - if res.Operation == gaio.OpRead && res.Size > 0 { - log.Println("menu receive: ", strings.TrimSpace(string(res.Buffer[:res.Size-2]))) - switch string(strings.TrimSpace(string(res.Buffer[:res.Size-2]))) { - case "welcome": - if err := w.Write(nil, conn, welcomeHandler()); err != nil { - log.Printf("error sending welcomeHandler from cmd `welcome`: %v", err) - } - case "adventure": - // start the door and wait - log.Printf("starting door handler...") - /* - if err := doorHandler(&c); err != nil { - log.Printf("error from cmd `adventure`: %v", err) - } - log.Printf("returning from door") - n, err := c.Write([]byte("\n\n> ")) - if err != nil { - log.Printf("error writing to connection: %v", err) - } else { - log.Printf("wrote %d byte prompt to connection", n) - } - */ - case "exit": - /* - if err := exitHandler(); err != nil { - log.Printf("error sending exitHandler from cmd `exit`: %v", err) - } - */ - default: - err := w.Write(nil, conn, []byte("huh?\n\n> ")) - if err != nil { - log.Printf("error writing to connection: %v", err) - } - } - } else { - log.Println("watcher confirming write:", string(res.Buffer)) - } - } - } - time.Sleep(time.Second) - } - log.Println("watcher controlloop closed!") - }(&WatcherControl) - log.Println("main thread waiting on wg") - wg.Wait() - log.Printf("Goodbye %s!", conn.RemoteAddr()) + log.Println("main thread looping") + // wg.Wait() + // log.Printf("Goodbye %s!", conn.RemoteAddr()) } } -func menuHandler(w *gaio.Watcher, c net.Conn) { - wg := new(sync.WaitGroup) +func menuHandler(ctx context.Context, conn net.Conn, w *gaio.Watcher) { + var wg sync.WaitGroup - // chan to terminate watcher goroutine when necessary - WatcherControl := make(chan string) + MenuControl := make(chan string) + + var inChan, outChan chan gaio.OpResult + var ok bool + + if inChan, outChan, ok = openconn.FromContext(ctx); !ok { + log.Println("Could not get inChan/outChan from context!", ok, ctx) + } - // watcher.WaitIO goroutine wg.Add(1) - go func(WatcherControl *chan string) { - log.Println("starting menu watchercontrol") + go func(inChan, outChan chan gaio.OpResult, MenuControl chan string) { defer wg.Done() + log.Println("starting menu loop") + paused := false ControlLoop: for { - log.Println("checking watcher waitio") + // log.Println("checking watcher waitio") select { - case msg := <-*WatcherControl: - if msg == "stop" { - log.Println("closing menu watchercontrol") + case msg := <-MenuControl: + log.Println("msg on MenuControl:", msg) + switch string(msg) { + + case "stop": + log.Println("closing menu loop") + paused = true break ControlLoop - } - default: - log.Println("menu watchercontrol waiting for IO") - results, err := w.WaitIO() - if err != nil { - log.Println(err) - return + + case "pause": + log.Println("pausing menu loop via menucontrol") + paused = true + + case "unpause": + log.Println("unpausing menu loop via menucontrol") + paused = false + + default: + log.Println("menu received unknown command: ", msg) } - log.Println("menu received IO!") - for _, res := range results { - if res.Operation == gaio.OpRead { - log.Println("menu receive: ", string(res.Buffer)) - switch string(res.Buffer) { - case "": - // noop - case "welcome": - if err := w.Write(nil, c, welcomeHandler()); err != nil { - log.Printf("error sending welcomeHandler from cmd `welcome`: %v", err) - } - case "adventure": - // start the door and wait - log.Printf("starting door handler...") - /* - if err := doorHandler(&c); err != nil { - log.Printf("error from cmd `adventure`: %v", err) + default: + // log.Println("no msg on MenuControl, continuing") + if !paused { + // log.Println("menu watchercontrol waiting for IO") + select { + case res := <-inChan: + if res.Error != nil { + log.Println("error on inChan: ", res.Error) + } + if res.Operation == gaio.OpRead && res.Size > 0 && res.Conn == conn { + log.Printf("received on inChan: conn: %v, buffer size: %v\n", res.Conn.RemoteAddr(), res.Size) + log.Println("menu receive: ", strings.TrimSpace(string(res.Buffer[:res.Size-2]))) + switch string(strings.TrimSpace(string(res.Buffer[:res.Size-2]))) { + + case "welcome": + if err := w.Write(ctx, conn, welcomeHandler()); err != nil { + log.Printf("error sending welcomeHandler from cmd `welcome`: %v", err) } + + case "adventure": + // start the door and wait + log.Printf("starting simulated door handler...") + MenuControl <- "pause" + // if err := doorHandler(w); err != nil { + // log.Printf("error from cmd `adventure`: %v", err) + // } + time.Sleep(5 * time.Second) log.Printf("returning from door") - n, err := c.Write([]byte("\n\n> ")) + MenuControl <- "unpause" + err := w.Write(ctx, conn, []byte("\n\n> ")) if err != nil { log.Printf("error writing to connection: %v", err) - } else { - log.Printf("wrote %d byte prompt to connection", n) } - */ - case "exit": - /* - if err := exitHandler(); err != nil { + + case "exit": + if err := w.Write(ctx, conn, exitHandler()); err != nil { log.Printf("error sending exitHandler from cmd `exit`: %v", err) + } else { + // let's wait before we actually close the connection to try and ensure the write gets through + time.Sleep(time.Second) + log.Println("exiting...") + log.Printf("Goodbye %s!", conn.RemoteAddr()) + err := w.Free(conn) + if err != nil { + log.Printf("err closing connection: %v\n", err) + } + MenuControl <- "stop" + break ControlLoop + } + default: + + err := w.Write(ctx, conn, []byte("huh?\n\n> ")) + if err != nil { + log.Printf("error writing to connection: %v", err) } - */ - default: - err := w.Write(nil, c, []byte("huh?\n\n> ")) - if err != nil { - log.Printf("error writing to connection: %v", err) } + + } else { + // noop + log.Printf("received on inChan: conn: %v, buffer: %v\n", res.Conn.RemoteAddr(), string(res.Buffer)) } - } else { - log.Println("watcher confirming write:", string(res.Buffer)) + /* + case res := <-outChan: + // log.Println("OpResult on outChan:", res) + if res.Error != nil { + log.Println("error on outChan: ", res.Error) + } + if res.Operation == gaio.OpWrite && res.Size > 0 && res.Conn == conn { + // write confirmation received + //log.Printf("received on outChan: conn: %v, buffer: %v\n", res.Conn.RemoteAddr(), string(res.Buffer)) + } else { + // noop + log.Printf("watcher sent unknown on outChan: %v\n", res) + } + */ + default: + // noop - wait a lil bit + time.Sleep(time.Millisecond) + } + if conn == nil { + log.Println("conn is nil!") } } } - time.Sleep(time.Second) + time.Sleep(time.Millisecond) } - log.Println("watcher controlloop closed!") - }(&WatcherControl) - log.Println("main thread waiting on wg") - wg.Wait() - log.Printf("Goodbye %s!", c.RemoteAddr()) -} + log.Println("menu controlloop closed!") + }(inChan, outChan, MenuControl) -func sendClientText(w *gaio.Watcher, c net.Conn, s string) error { - err := w.Write(nil, c, []byte(s)) - if err != nil { - log.Printf("error writing to connection: %v", err) - return err - } - return nil + log.Println("menu waiting on waitgroup") + wg.Wait() + log.Println("terminating menucontrol for client") } func welcomeHandler() []byte { @@ -247,10 +284,7 @@ func welcomeHandler() []byte { return []byte(bannerText) } -/* - - -func exitHandler(c net.Conn) error { +func exitHandler() []byte { const exitMessage = "\x1b[2J\x1b[H\x1b[31m\x1b[J\r\n" + ` ___| | | @@ -261,16 +295,13 @@ func exitHandler(c net.Conn) error { ` - err := sendClientText(c, exitMessage) - if err != nil { - log.Printf("error writing to connection: %v", err) - } - time.Sleep(500 * time.Millisecond) - return c.Close() + return []byte(exitMessage) } -func doorHandler(c *net.Conn) error { +// rework to mimic mainHandler +/* +func doorHandler(w *gaio.Watcher) error { const bannerText = "\r\nCOLOSSAL CAVE\r\n\r\n" err := sendClientText(*c, bannerText) if err != nil { diff --git a/openconn/openconn.go b/openconn/openconn.go new file mode 100644 index 0000000..aca69fa --- /dev/null +++ b/openconn/openconn.go @@ -0,0 +1,46 @@ +package openconn + +import ( + "context" + + "github.com/xtaci/gaio" +) + +// The key type is unexported to prevent collisions with context keys defined in +// other packages. +type key int + +const inChanKey key = 0 +const outChanKey key = 0 + +// NewContext returns a new Context carrying two new channels. +func NewContext(ctx context.Context) context.Context { + inChan := make(chan gaio.OpResult) + outChan := make(chan gaio.OpResult) + ctx = context.WithValue(ctx, inChanKey, inChan) + ctx = context.WithValue(ctx, outChanKey, outChan) + return ctx +} + +// FromContext extracts the two channels from ctx, if present. +func FromContext(ctx context.Context) (chan gaio.OpResult, chan gaio.OpResult, bool) { + // ctx.Value returns nil if ctx has no value for the key; + // the type assertion returns ok=false for nil. + inChan, iok := ctx.Value(inChanKey).(chan gaio.OpResult) + outChan, ook := ctx.Value(outChanKey).(chan gaio.OpResult) + if iok && ook { + return inChan, outChan, iok && ook + } + return nil, nil, iok && ook +} + +// FromOpResult extracts the two channels from a gaio.OpResult, if present. +func FromOpResult(res gaio.OpResult) (chan gaio.OpResult, chan gaio.OpResult, bool) { + ctx, ok := res.Context.(context.Context) + if ok { + inChan, iok := ctx.Value(inChanKey).(chan gaio.OpResult) + outChan, ook := ctx.Value(outChanKey).(chan gaio.OpResult) + return inChan, outChan, iok && ook + } + return nil, nil, ok +}