add request and release functions for pool, state tracking for scene

This commit is contained in:
Steven Polley 2023-02-27 23:33:35 -07:00
parent f44423dd38
commit bface08f20
2 changed files with 221 additions and 58 deletions

View File

@ -3,6 +3,7 @@ package main
import (
"log"
"strconv"
"strings"
"time"
"github.com/g3n/engine/app"
@ -20,12 +21,17 @@ import (
"github.com/g3n/engine/window"
)
var (
scene *core.Node
tableConnections *gui.Table
)
func main() {
// Create application and scene
a := app.App(1920, 1080, "Leaky Pool")
scene := core.NewNode()
scene = core.NewNode()
// Set the scene to be managed by the gui manager
gui.Manager().Set(scene)
@ -74,6 +80,66 @@ func main() {
//GUI//
///////
// Connections Table
var err error
tableConnections, err = gui.NewTable(200, 200, []gui.TableColumn{
{Id: "1", Header: "Local Port", Width: 75, Expand: 0},
{Id: "2", Header: "Status", Width: 125, Expand: 0},
})
if err != nil {
log.Fatalf("failed to create tableConnections: %v", err)
}
tableConnections.SetBorders(1, 1, 1, 1)
tableConnections.SetPosition(0, 35)
tableConnections.SetMargins(10, 10, 10, 10)
editSend := gui.NewEdit(150, "data to transmit...")
editSend.SetPosition(tableConnections.Position().X+tableConnections.Width()+10, tableConnections.Position().Y+15)
scene.Add(editSend)
buttonSendRelease := gui.NewButton("Send and Release")
buttonSendRelease.SetPosition(editSend.Position().X+editSend.Width()+10, editSend.Position().Y-2)
buttonSendRelease.Subscribe(gui.OnClick, func(name string, ev interface{}) {
// Get the selected connection in the table
// Get the editSend text
// Call the "client program" function to convert to integer and transmit, then release
/* if len(tableConnections.SelectedRows()) != 1 {
return
}
row := tableConnections.Rows(0, -1)[tableConnections.SelectedRows()[0]]
if row["2"] != "Leased Out" {
return
}
var conn *net.TCPConn
for i := range dials {
if dials[i].Connection == nil {
continue
}
if row["1"] != strings.Split(dials[i].Connection.LocalAddr().String(), ":")[1] {
continue
}
conn = dials[i].Connection
break
}
if conn == nil {
log.Printf("could not match selected table entry to dialed connection")
return
}
*/
clientProgram(editSend.Text())
})
scene.Add(buttonSendRelease)
labelServer := gui.NewLabel("Server: ")
labelServer.SetPosition(10, 8)
scene.Add(labelServer)
@ -92,30 +158,28 @@ func main() {
editPoolSize.SetPosition(labelPoolSize.Position().X+labelPoolSize.Width()+10, 10)
scene.Add(editPoolSize)
buttonConnect := gui.NewButton("Start Pool")
buttonConnect.SetPosition(editPoolSize.Position().X+editPoolSize.Width()+10, 8)
buttonConnect.Subscribe(gui.OnClick, func(name string, ev interface{}) {
buttonPoolSize := gui.NewButton("Set")
buttonPoolSize.SetPosition(editPoolSize.Position().X+editPoolSize.Width()+10, 8)
buttonPoolSize.Subscribe(gui.OnClick, func(name string, ev interface{}) {
poolSize, err := strconv.Atoi(editPoolSize.Text())
if err != nil {
log.Printf("failed to convert poolSize '%d' to integer: %v", poolSize, err)
return
}
go connPoolWatchdog(editServer.Text(), poolSize, scene) // goroutine that keeps the pool full of healthy TCP connections
setConnPoolSize(poolSize)
})
scene.Add(buttonPoolSize)
buttonConnect.Label.SetText("Stop Pool") // stop not implemented
buttonStartPool := gui.NewButton("Start Pool")
buttonStartPool.SetPosition(buttonPoolSize.Position().X+buttonPoolSize.Width()+10, 8)
buttonStartPool.Subscribe(gui.OnClick, func(name string, ev interface{}) {
go connPoolWatchdog(editServer.Text()) // goroutine that keeps the pool full of healthy TCP connections
buttonStartPool.Label.SetText("Stop Pool") // stop not implemented - need to restart program
})
scene.Add(buttonConnect)
scene.Add(buttonStartPool)
// Create and add a button to the scene
btn := gui.NewButton("Make Red")
btn.SetPosition(100, 40)
btn.SetSize(40, 40)
btn.Subscribe(gui.OnClick, func(name string, ev interface{}) {
mat.SetColor(math32.NewColor("DarkRed"))
})
scene.Add(btn)
scene.Add(tableConnections)
// Run the application
a.Run(func(renderer *renderer.Renderer, deltaTime time.Duration) {
@ -124,3 +188,39 @@ func main() {
renderer.Render(scene, cam)
})
}
// updates the scene based on the status of the dial
// Disconnected, Connected, Leased Out
// Disconnected:
// Connected: dials[i].Mat.SetColor(math32.NewColor("LimeGreen"))
// Leased Out:
func updateSceneConnections() {
rows := make([]map[string]interface{}, 0, len(dials))
for i := range dials {
row := make(map[string]interface{})
if dials[i].Connection != nil {
row["1"] = strings.Split(dials[i].Connection.LocalAddr().String(), ":")[1]
} else {
row["1"] = "N/A"
}
row["2"] = dials[i].Status
switch dials[i].Status {
case "Disconnected":
dials[i].Mat.SetColor(math32.NewColor("Grey"))
case "Connected":
dials[i].Mat.SetColor(math32.NewColor("LimeGreen"))
case "Leased Out":
dials[i].Mat.SetColor(math32.NewColor("Blue"))
default:
log.Printf("connection has unknown status! Should be one of Disconnected, Connected or Leased Out")
}
rows = append(rows, row)
}
tableConnections.SetRows(rows)
}

View File

@ -1,6 +1,7 @@
package main
import (
"fmt"
"log"
"net"
"time"
@ -12,23 +13,53 @@ import (
"github.com/g3n/engine/math32"
)
var dials []Dial
var (
dials []Dial
)
type Dial struct {
Connection net.Conn // the real deal underlying TCP connection
Connection *net.TCPConn // the real deal underlying TCP connection
Geo *geometry.Geometry // the 3D geometry
Mat *material.Standard // the material / color
Mesh *graphic.Mesh // the combined geometry and material
Node *core.Node // pointer to node in scene, required for removing the object if connection goes down
Status string // String to control what's displayed in the scene. Can be Disconnected, Connected, Leased Out
}
// TBD: Make less destructive when downsizing (prioritize dials without connections, then dials that aren't leased out, then what's left)
func setConnPoolSize(maxPoolSize int) {
for _, dial := range dials {
scene.Remove(dial.Mesh)
dial.Geo = nil
dial.Mat = nil
dial.Mesh = nil
dial.Node = nil
}
dials = make([]Dial, 0)
// Update the position of the connections on the scene
padding := float32(200 / (maxPoolSize + 1)) // space between the connection objects in 3D space
currX := float32(maxPoolSize/2) * -padding // variable gets updated, this is the initial starting position (left most connection in 3D space)
if maxPoolSize%2 == 0 {
currX += padding / 2
}
for i := 0; i < maxPoolSize; i++ {
dial := Dial{Geo: geometry.NewBox(1, 1, 1), Mat: material.NewStandard(math32.NewColor("Grey")), Status: "Disconnected"}
dial.Mesh = graphic.NewMesh(dial.Geo, dial.Mat)
dial.Mesh.SetPositionX(currX)
currX += padding
dial.Node = scene.Add(dial.Mesh)
dials = append(dials, dial)
}
updateSceneConnections()
}
// Keeps the pool full, replaces stale connections, and at the same time creates the objects in the 3D scenes representing physical connections
// should be ran in its own goroutine
func connPoolWatchdog(serverAddress string, maxPoolSize int, scene *core.Node) {
dials = make([]Dial, 0)
func connPoolWatchdog(serverAddress string) {
tcpAddr, err := net.ResolveTCPAddr("tcp4", serverAddress)
if err != nil {
log.Fatalf("failed to resolve serverAddress '%s': %v", serverAddress, err)
@ -36,48 +67,44 @@ func connPoolWatchdog(serverAddress string, maxPoolSize int, scene *core.Node) {
for {
stateChange := false // set to true if there's been any state changes since the last polling
// Check status of existing open connections in pool
i := 0 // output index
for _, dial := range dials {
if !isConnUp(dial.Connection) { // TBD: this should be moved to check before first write to the socket file descriptor instead of polling here
log.Printf("closing bad idle connection and removing from pool - %s", dial.Connection.LocalAddr().String())
dial.Connection.Close()
scene.Remove(dial.Mesh)
for i := range dials {
// if there is no existing connection
if dials[i].Connection == nil {
log.Printf("dialing new connection")
var err error
dials[i].Connection, err = net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Printf("failed to dial TCP connection: %v", err)
break
}
dials[i].Connection.SetKeepAlive(true)
dials[i].Connection.SetKeepAlivePeriod(time.Second * 5)
dials[i].Status = "Connected"
stateChange = true
continue
}
dials[i] = dial
i++
}
dials = dials[:i]
// fill any empty slots in the pool with fresh connections
for poolSize := len(dials); poolSize < maxPoolSize; poolSize++ {
log.Printf("Current pool size is '%d', desired pool size is '%d' - opening new connection...", poolSize, maxPoolSize)
conn, err := net.DialTCP("tcp", nil, tcpAddr)
if err != nil {
log.Printf("failed to dial TCP connection: %v", err)
break
// test if the connection is still working
if !isConnUp(dials[i].Connection) { // TBD: this should be moved to check before first write to the socket file descriptor instead of polling here
log.Printf("closing bad idle connection and removing from pool - %s", dials[i].Connection.LocalAddr().String())
dials[i].Connection.Close()
dials[i].Connection = nil
dials[i].Status = "Disconnected"
stateChange = true
continue
}
conn.SetKeepAlive(true)
conn.SetKeepAlivePeriod(time.Second * 5)
dial := Dial{Connection: conn, Geo: geometry.NewBox(1, 1, 1), Mat: material.NewStandard(math32.NewColor("LimeGreen"))}
dial.Mesh = graphic.NewMesh(dial.Geo, dial.Mat)
dial.Node = scene.Add(dial.Mesh)
dials = append(dials, dial)
}
// Update the position of the connections on the scene
padding := float32(200 / (len(dials) + 1)) // space between the connection objects in 3D space
currX := float32(len(dials)/2) * -padding // variable gets updated, this is the initial starting position (left most connection in 3D space)
if len(dials)%2 == 0 {
currX += padding / 2
}
//currX := padding - 100
for _, dial := range dials {
dial.Mesh.SetPositionX(currX)
currX += padding
if stateChange {
updateSceneConnections()
}
time.Sleep(time.Second * 5) // random sleep, AKA evidence this shouldn't be a watchdog,
@ -85,9 +112,45 @@ func connPoolWatchdog(serverAddress string, maxPoolSize int, scene *core.Node) {
}
}
// Checks with OS to ensure that a connection is still active
// TBD: Refactor to buffered channel instead of finding one on the fly?
func requestLease() (*net.TCPConn, error) {
inPoolConns := make(map[int]Dial)
for i, dial := range dials {
if dial.Status == "Connected" {
inPoolConns[i] = dial
}
}
if len(inPoolConns) == 0 {
return nil, fmt.Errorf("pool exhausted")
}
// Selects a random connection from the pool
var i int
for i = range inPoolConns {
break
}
dials[i].Status = "Leased Out"
updateSceneConnections()
return dials[i].Connection, nil
}
func releaseConnection(conn *net.TCPConn) {
for i := range dials {
if dials[i].Connection.LocalAddr().String() != conn.LocalAddr().String() {
continue
}
dials[i].Status = "Connected"
updateSceneConnections()
return
}
}
// Checks if connection is still up by reading the heartbeat data from the server
// returns err if connection is not active
func isConnUp(conn net.Conn) bool {
func isConnUp(conn *net.TCPConn) bool {
conn.SetReadDeadline(time.Now().Add(time.Millisecond * 300))
buf := make([]byte, 128)