package main import ( "bufio" "context" "flag" "log" "net" "os/exec" "strconv" "strings" "sync" "time" "github.com/xtaci/gaio" "versestudios.com/go-telnet-asyncio-test/openconn" ) type InChanKey string type OutChanKey string func ioHandler(w *gaio.Watcher, WatcherControl *chan string) { for { select { case msg := <-*WatcherControl: switch msg { case "": log.Println("empty WatcherControl command") case "stop": log.Println("stopping ioHandler via WatcherControl stop command") return default: log.Println("unknown WatcherControl command:", msg) } default: // loop wait for any IO events results, err := w.WaitIO() if err != nil { log.Println(err) return } IOLoop: for _, res := range results { if res.Context != nil && nil != res.Context.(context.Context) { if inChan, _, ok := openconn.FromContext(res.Context.(context.Context)); ok { switch res.Operation { case gaio.OpRead: // read completion event inChan <- res // queue next read w.Read(res.Context, res.Conn, nil) default: // anything else (meaning write completions) continue IOLoop } } else { log.Printf("error getting inChan and outChan from context: %v\n", res.Context) } } else { log.Printf("nil context! res: %v\n", res) } } } time.Sleep(time.Millisecond) } } func main() { port := flag.Int("port", 3333, "Port to accept connections on.") host := flag.String("host", "127.0.0.1", "Host or IP to bind to") flag.Parse() w, err := gaio.NewWatcher() if err != nil { log.Fatal(err) } defer w.Close() l, err := net.Listen("tcp", *host+":"+strconv.Itoa(*port)) if err != nil { log.Panicln(err) } log.Println("Listening to connections at '"+*host+"' on port", strconv.Itoa(*port)) defer l.Close() for { conn, err := l.Accept() if err != nil { log.Panicln(err) } log.Println("new client: ", conn.RemoteAddr()) // chan to terminate ioHandler when neccessary WatcherControl := make(chan string) // get new context with inChan and outChan for this client connection ctx := openconn.NewContext(context.Background()) log.Printf("new context for client %v: %v\n", conn.RemoteAddr(), ctx) // submit the first async write IO request err = w.Write(ctx, conn, welcomeHandler()) if err != nil { log.Printf("err sending welcomeHandler: %v\n", err) return } // now that a prompt is (or will be) displayed, go ahead and listen for input err = w.Read(ctx, conn, nil) if err != nil { log.Printf("err queueing w.Read: %v\n", err) return } time.Sleep(time.Millisecond) // menu handler for this connection go menuHandler(ctx, conn, w) // io handler for this connection go ioHandler(w, &WatcherControl) log.Println("main thread looping") // wg.Wait() // log.Printf("Goodbye %s!", conn.RemoteAddr()) } } func menuHandler(ctx context.Context, conn net.Conn, w *gaio.Watcher) { var wg sync.WaitGroup MenuControl := make(chan string) var inChan, outChan chan gaio.OpResult var ok bool if inChan, outChan, ok = openconn.FromContext(ctx); !ok { log.Println("Could not get inChan/outChan from context!", ok, ctx) } wg.Add(1) go func(inChan, outChan chan gaio.OpResult, MenuControl chan string) { defer wg.Done() log.Println("starting menu loop") paused := false ControlLoop: for { // log.Println("checking watcher waitio") select { case msg := <-MenuControl: log.Println("msg on MenuControl:", msg) switch string(msg) { case "stop": log.Println("closing menu loop") paused = true break ControlLoop case "pause": log.Println("pausing menu loop via menucontrol") paused = true case "unpause": log.Println("unpausing menu loop via menucontrol") paused = false default: log.Println("menu received unknown command: ", msg) } default: // log.Println("no msg on MenuControl, continuing") if !paused { // log.Println("menu watchercontrol waiting for IO") select { case res := <-inChan: if res.Error != nil { log.Println("error on inChan: ", res.Error) } if res.Operation == gaio.OpRead && res.Size > 0 && res.Conn == conn { log.Printf("received on inChan: conn: %v, buffer size: %v\n", res.Conn.RemoteAddr(), res.Size) log.Println("menu receive: ", strings.TrimSpace(string(res.Buffer[:res.Size-2]))) switch string(strings.TrimSpace(string(res.Buffer[:res.Size-2]))) { case "welcome": if err := w.Write(ctx, conn, welcomeHandler()); err != nil { log.Printf("error sending welcomeHandler from cmd `welcome`: %v", err) } case "adventure": // start the door and wait log.Println("starting door handler...") var wg sync.WaitGroup wg.Add(1) go doorHandler(ctx, conn, w, &wg) log.Println("menu handler waiting for wg to return from door") wg.Wait() log.Println("returning from door") //MenuControl <- "unpause" err := w.Write(ctx, conn, []byte("\n\n> ")) if err != nil { log.Printf("error writing to connection: %v", err) } case "exit": if err := w.Write(ctx, conn, exitHandler()); err != nil { log.Printf("error sending exitHandler from cmd `exit`: %v\n", err) } else { // let's wait before we actually close the connection to try and ensure the write gets through time.Sleep(time.Second) log.Println("exiting...") log.Printf("Goodbye %s!\n", conn.RemoteAddr()) err := w.Free(conn) if err != nil { log.Printf("err closing connection: %v\n", err) } MenuControl <- "stop" break ControlLoop } default: err := w.Write(ctx, conn, []byte("huh?\n\n> ")) if err != nil { log.Printf("error writing to connection: %v", err) } } } else { // noop log.Printf("received on inChan: conn: %v, buffer: %v\n", res.Conn.RemoteAddr(), string(res.Buffer)) } /* case res := <-outChan: // log.Println("OpResult on outChan:", res) if res.Error != nil { log.Println("error on outChan: ", res.Error) } if res.Operation == gaio.OpWrite && res.Size > 0 && res.Conn == conn { // write confirmation received //log.Printf("received on outChan: conn: %v, buffer: %v\n", res.Conn.RemoteAddr(), string(res.Buffer)) } else { // noop log.Printf("watcher sent unknown on outChan: %v\n", res) } */ default: // noop - wait a lil bit time.Sleep(time.Millisecond) } if conn == nil { log.Println("conn is nil!") } } } time.Sleep(time.Millisecond) } log.Println("menu controlloop closed!") }(inChan, outChan, MenuControl) log.Println("menu waiting on waitgroup") wg.Wait() log.Println("terminating menucontrol for client") } func welcomeHandler() []byte { const bannerText = "\x1b[2J\x1b[H\x1b[31m\x1b[J\r\n" + ` _ __ _ ` + "" + ` | | / /__ / /________ ____ ___ ___` + "" + ` | | /| / / _ \/ / ___/ __ \/ __ '__ \/ _ \` + "" + ` | |/ |/ / __/ / /__/ /_/ / / / / / / __/` + "" + ` |__/|__/\___/_/\___/\____/_/ /_/ /_/\___/` + "\r\n\x1b[33m" + ` __` + "" + ` / /_____ ` + "" + ` / __/ __ \` + "" + ` / /_/ /_/ /` + "" + ` \__/\____/` + "\x1b[35m\r\n" + ` _ __` + "" + ` | | / /__ _____________` + "" + ` | | / / _ \/ ___/ ___/ _ \` + "" + ` | |/ / __/ / (__ ) __/` + "" + ` |___/\___/_/ /____/\___/` + "\x1b[37m\r\n" + ` _____ __ ___` + "" + ` / ___// /___ ______/ (_)___ _____` + "" + ` \__ \/ __/ / / / __ / / __ \/ ___/` + "" + ` ___/ / /_/ /_/ / /_/ / / /_/ (__ ) ` + "" + ` /____/\__/\__,_/\__,_/_/\____/____/ ` + "\x1b[32m\r\n\r\n\r\n" + ` Help is available: type help` + "\r\n\r\n> " return []byte(bannerText) } func exitHandler() []byte { const exitMessage = "\x1b[2J\x1b[H\x1b[31m\x1b[J\r\n" + ` ___| | | \___ \ _ \ _ \ | | _ \ | | __| __ \ _' | __| _ \ __| _ \\ \ \ / __ \ _ \ | | | | __/ __/ | | ( | | | \__ \ | | ( | ( __/ ( ( |\ \ \ / | | ( | | |_| _____/ \___|\___| \__, |\___/ \__,_| ) ____/ .__/ \__,_|\___|\___| \___|\___/ \_/\_/ _.__/ \___/ \__, |_) ____/ / _| ____/ ` return []byte(exitMessage) } func doorHandler(ctx context.Context, c net.Conn, w *gaio.Watcher, menuwg *sync.WaitGroup) error { defer menuwg.Done() const bannerText = "\r\nCOLOSSAL CAVE\r\n\r\n" err := w.Write(ctx, c, []byte(bannerText)) if err != nil { log.Printf("error writing to connection: %v\n", err) } var inChan, _ chan gaio.OpResult var ok bool if inChan, _, ok = openconn.FromContext(ctx); !ok { log.Println("Could not get inChan/outChan from context!", ok, ctx) } cmd := exec.Command("/usr/games/adventure") stdout, _ := cmd.StdoutPipe() stdin, _ := cmd.StdinPipe() cmd.Start() var wg sync.WaitGroup wg.Add(1) Terminator := make(chan bool) go func(Terminator *chan bool, wg *sync.WaitGroup) { log.Println("starting stdin reader goroutine") defer wg.Done() Check: for { select { case <-*Terminator: break Check case result, ok := <-inChan: if ok { if result.Operation == gaio.OpRead { // we asked for a read - send it to stdin stdin.Write(result.Buffer[:result.Size]) } } else { log.Println("yikes, problem getting a result from inChan in doorHandler!") } default: time.Sleep(time.Millisecond) } } log.Println("exiting stdin reader goroutine") }(&Terminator, &wg) log.Println("starting stdout reader goroutine") scanner := bufio.NewScanner(stdout) scanner.Split(bufio.ScanLines) for scanner.Scan() { m := scanner.Bytes() m = append(m, '\n') w.Write(ctx, c, m) log.Printf("wrote %d bytes to watcher from door stdout: %v\n", len(m)+1, string(m)) } log.Println("exiting stdout reader goroutine") Terminator <- true log.Println("waiting for wg") wg.Wait() log.Println("waiting for cmd") exitErr := cmd.Wait() if exitErr != nil { log.Printf("cmd exited with err: %v\n", exitErr) } log.Println("leaving door handler") return nil }