first pass at using gaio for non-blocking net reads

This commit is contained in:
Sundog Jones 2021-04-14 08:25:41 -07:00
parent f0a810c330
commit 30989fd935
2 changed files with 169 additions and 86 deletions

2
go.sum Normal file
View File

@ -0,0 +1,2 @@
github.com/xtaci/gaio v1.2.9 h1:EuVc7Q2JDzIY2mk5mjtq4K5BgTuO+kj5LXzCwjOK+mo=
github.com/xtaci/gaio v1.2.9/go.mod h1:rJMerwiLCLnKa14YTM/sRggTPrnBZrlCg9U3DnV5VBE=

205
main.go
View File

@ -1,13 +1,11 @@
package main package main
import ( import (
"bufio"
"flag" "flag"
"io"
"log" "log"
"net" "net"
"os/exec"
"strconv" "strconv"
"strings"
"sync" "sync"
"time" "time"
@ -38,65 +36,62 @@ func main() {
log.Panicln(err) log.Panicln(err)
} }
log.Println("new client", conn.RemoteAddr()) log.Println("new client: ", conn.RemoteAddr())
// set up io channel for this connection
chIO := make(chan gaio.OpResult)
// submit the first async write IO request // submit the first async write IO request
err = (chIO, conn, welcomeHandler()) err = w.Write(nil, conn, welcomeHandler())
if err != nil { if err != nil {
log.Printf("err sending welcomeHandler: %v\n", err) log.Printf("err sending welcomeHandler: %v\n", err)
return return
} }
// hand off channel to menuHandler // hand off channel to menuHandler
menuHandler(chIO, conn, *w) // menuHandler(w, conn)
}
}
func menuHandler(chIO chan gaio.OpResult, c net.Conn, w gaio.Watcher) {
log.Printf("Connection received: %s", c.RemoteAddr())
// watcher.WaitIO goroutine
go func() {
for {
results, err := w.WaitIO()
if err != nil {
log.Println(err)
return
}
for _, res := range results {
chIO <- res
}
}
}()
// send welcome banner
if err := welcomeHandler(&c); err != nil {
log.Printf("error sending welcomeHandler on initial connect: %v", err)
}
wg := new(sync.WaitGroup) wg := new(sync.WaitGroup)
// chan to terminate watcher goroutine when necessary
WatcherControl := make(chan string)
// watcher.WaitIO goroutine
wg.Add(1) wg.Add(1)
go func() { go func(WatcherControl *chan string) {
log.Println("starting menu watchercontrol")
defer wg.Done() defer wg.Done()
scanner := bufio.NewScanner(c) ControlLoop:
scanner.Split(bufio.ScanLines) for {
log.Println("checking watcher waitio")
select {
case msg := <-*WatcherControl:
if msg == "stop" {
log.Println("closing menu watchercontrol")
break ControlLoop
}
default:
log.Println("menu watchercontrol waiting for IO")
inBuf := make([]byte, 256)
err := w.ReadTimeout(nil, conn, inBuf, time.Now().Add(time.Second))
if err != nil {
log.Printf("err on watcher readtimeout in menu: %v\n", err)
continue
}
results, err := w.WaitIO()
if err != nil {
log.Println("watcher wait err: ", err)
continue
}
for scanner.Scan() { log.Println("menu received IO!")
line := scanner.Bytes() for _, res := range results {
if res.Operation == gaio.OpRead && res.Size > 0 {
log.Printf("Menu received line: %v (%v)", line, string(line)) log.Println("menu receive: ", strings.TrimSpace(string(res.Buffer[:res.Size-2])))
switch string(line) { switch string(strings.TrimSpace(string(res.Buffer[:res.Size-2]))) {
case "welcome": case "welcome":
if err := welcomeHandler(&c); err != nil { if err := w.Write(nil, conn, welcomeHandler()); err != nil {
log.Printf("error sending welcomeHandler from cmd `welcome`: %v", err) log.Printf("error sending welcomeHandler from cmd `welcome`: %v", err)
} }
case "adventure": case "adventure":
// start the door and wait // start the door and wait
log.Printf("starting door handler...") log.Printf("starting door handler...")
/*
if err := doorHandler(&c); err != nil { if err := doorHandler(&c); err != nil {
log.Printf("error from cmd `adventure`: %v", err) log.Printf("error from cmd `adventure`: %v", err)
} }
@ -107,39 +102,124 @@ func menuHandler(chIO chan gaio.OpResult, c net.Conn, w gaio.Watcher) {
} else { } else {
log.Printf("wrote %d byte prompt to connection", n) log.Printf("wrote %d byte prompt to connection", n)
} }
*/
case "exit": case "exit":
if err := exitHandler(c); err != nil { /*
if err := exitHandler(); err != nil {
log.Printf("error sending exitHandler from cmd `exit`: %v", err) log.Printf("error sending exitHandler from cmd `exit`: %v", err)
} }
*/
default: default:
n, err := c.Write([]byte("huh?\n\n> ")) err := w.Write(nil, conn, []byte("huh?\n\n> "))
if err != nil {
log.Printf("error writing to connection: %v", err)
}
}
} else {
log.Println("watcher confirming write:", string(res.Buffer))
}
}
}
time.Sleep(time.Second)
}
log.Println("watcher controlloop closed!")
}(&WatcherControl)
log.Println("main thread waiting on wg")
wg.Wait()
log.Printf("Goodbye %s!", conn.RemoteAddr())
}
}
func menuHandler(w *gaio.Watcher, c net.Conn) {
wg := new(sync.WaitGroup)
// chan to terminate watcher goroutine when necessary
WatcherControl := make(chan string)
// watcher.WaitIO goroutine
wg.Add(1)
go func(WatcherControl *chan string) {
log.Println("starting menu watchercontrol")
defer wg.Done()
ControlLoop:
for {
log.Println("checking watcher waitio")
select {
case msg := <-*WatcherControl:
if msg == "stop" {
log.Println("closing menu watchercontrol")
break ControlLoop
}
default:
log.Println("menu watchercontrol waiting for IO")
results, err := w.WaitIO()
if err != nil {
log.Println(err)
return
}
log.Println("menu received IO!")
for _, res := range results {
if res.Operation == gaio.OpRead {
log.Println("menu receive: ", string(res.Buffer))
switch string(res.Buffer) {
case "":
// noop
case "welcome":
if err := w.Write(nil, c, welcomeHandler()); err != nil {
log.Printf("error sending welcomeHandler from cmd `welcome`: %v", err)
}
case "adventure":
// start the door and wait
log.Printf("starting door handler...")
/*
if err := doorHandler(&c); err != nil {
log.Printf("error from cmd `adventure`: %v", err)
}
log.Printf("returning from door")
n, err := c.Write([]byte("\n\n> "))
if err != nil { if err != nil {
log.Printf("error writing to connection: %v", err) log.Printf("error writing to connection: %v", err)
} else { } else {
log.Printf("wrote %d bytes to connection", n) log.Printf("wrote %d byte prompt to connection", n)
}
*/
case "exit":
/*
if err := exitHandler(); err != nil {
log.Printf("error sending exitHandler from cmd `exit`: %v", err)
}
*/
default:
err := w.Write(nil, c, []byte("huh?\n\n> "))
if err != nil {
log.Printf("error writing to connection: %v", err)
}
}
} else {
log.Println("watcher confirming write:", string(res.Buffer))
} }
} }
} }
}() time.Sleep(time.Second)
}
time.Sleep(time.Millisecond) log.Println("watcher controlloop closed!")
}(&WatcherControl)
log.Println("main thread waiting on wg")
wg.Wait() wg.Wait()
log.Printf("Goodbye %s!", c.RemoteAddr()) log.Printf("Goodbye %s!", c.RemoteAddr())
} }
func sendClientText(c net.Conn, s string) error { func sendClientText(w *gaio.Watcher, c net.Conn, s string) error {
n, err := c.Write([]byte(s)) err := w.Write(nil, c, []byte(s))
if err != nil { // svr.ListenAndServe() if err != nil {
log.Printf("error writing to connection: %v", err) log.Printf("error writing to connection: %v", err)
return err return err
} else {
log.Printf("wrote %d bytes to connection", n)
} }
return nil return nil
} }
func welcomeHandler(c *net.Conn) error { func welcomeHandler() []byte {
const bannerText = "\x1b[2J\x1b[H\x1b[31m\x1b[J\r\n" + ` const bannerText = "\x1b[2J\x1b[H\x1b[31m\x1b[J\r\n" + `
_ __ _ ` + "" + ` _ __ _ ` + "" + `
@ -163,13 +243,13 @@ func welcomeHandler(c *net.Conn) error {
___/ / /_/ /_/ / /_/ / / /_/ (__ ) ` + "" + ` ___/ / /_/ /_/ / /_/ / / /_/ (__ ) ` + "" + `
/____/\__/\__,_/\__,_/_/\____/____/ ` + "\x1b[32m\r\n\r\n\r\n" + ` /____/\__/\__,_/\__,_/_/\____/____/ ` + "\x1b[32m\r\n\r\n\r\n" + `
Help is available: type help` + "\r\n\r\n> " Help is available: type help` + "\r\n\r\n> "
err := sendClientText(*c, bannerText)
if err != nil { return []byte(bannerText)
log.Printf("error writing to connection: %v", err)
}
return nil
} }
/*
func exitHandler(c net.Conn) error { func exitHandler(c net.Conn) error {
const exitMessage = "\x1b[2J\x1b[H\x1b[31m\x1b[J\r\n" + ` const exitMessage = "\x1b[2J\x1b[H\x1b[31m\x1b[J\r\n" + `
@ -258,3 +338,4 @@ func doorHandler(c *net.Conn) error {
} }
return nil return nil
} }
*/