Optimized and beautified concurrency

Credits to Kangaroo rat and zyxkad
This commit is contained in:
Verox007 2023-12-17 00:43:24 +01:00
parent 4a0bbfa81a
commit 720f70338f
3 changed files with 96 additions and 68 deletions

View File

@ -1,6 +1,7 @@
package cmd package cmd
import ( import (
"context"
"fmt" "fmt"
"github.com/eiannone/keyboard" "github.com/eiannone/keyboard"
"sync" "sync"
@ -44,7 +45,7 @@ func (l *LoggerQueue) Error(message string) {
l.log("ERROR", message, "") l.log("ERROR", message, "")
} }
func (l *LoggerQueue) processLogs(stopChan chan struct{}) { func (l *LoggerQueue) processLogs(ctx context.Context) {
ticker := time.NewTicker(100 * time.Millisecond) ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop() defer ticker.Stop()
@ -52,9 +53,8 @@ func (l *LoggerQueue) processLogs(stopChan chan struct{}) {
select { select {
case logEntry := <-l.queue: case logEntry := <-l.queue:
displayLog(logEntry) displayLog(logEntry)
case <-stopChan: case <-ctx.Done():
fmt.Print("\033[2K\r") fmt.Print("\033[2K\r")
println("Stopping")
return return
case <-ticker.C: case <-ticker.C:
refreshInputDisplay(l.currentInput) refreshInputDisplay(l.currentInput)
@ -71,53 +71,62 @@ func refreshInputDisplay(input string) {
fmt.Print("\033[2K\r> " + input) fmt.Print("\033[2K\r> " + input)
} }
func Start(wg *sync.WaitGroup, stopChan chan struct{}) { func Run(baseCtx context.Context) {
defer wg.Done() ctx, cancel := context.WithCancel(baseCtx)
defer cancel()
if err := keyboard.Open(); err != nil {
panic(err)
}
defer func() {
go keyboard.Close()
}()
go Logger.processLogs(stopChan)
input := "" input := ""
for { event, err := keyboard.GetKeys(10)
char, key, err := keyboard.GetKey()
if err != nil { if err != nil {
Logger.Error(fmt.Sprintf("Keyboard error: %v", err)) Logger.Error(fmt.Sprintf("Keyboard error: %v", err))
break return
} }
switch { var wg sync.WaitGroup
case key == keyboard.KeyEnter:
wg.Add(1)
go func() {
defer wg.Done()
Logger.processLogs(ctx)
}()
loop:
for {
select {
case <-ctx.Done():
cancel()
break loop
case event := <-event:
if event.Rune != 0 {
input += string(event.Rune)
}
switch event.Key {
case keyboard.KeyEnter:
switch input { switch input {
case "": case "":
// Do nothing // Do nothing
case "stop": case "stop":
Logger.Info("Received stop command.") Logger.Info("Received stop command.")
close(stopChan) cancel()
return break loop
default: default:
Logger.Info(fmt.Sprintf("Unknown command: %s", input)) Logger.Info(fmt.Sprintf("Unknown command: %s", input))
} }
input = "" input = ""
case key == keyboard.KeyBackspace || key == keyboard.KeyBackspace2: case keyboard.KeyBackspace | keyboard.KeyBackspace2:
if len(input) > 0 { if len(input) > 0 {
input = input[:len(input)-1] input = input[:len(input)-1]
} }
case key == keyboard.KeySpace: case keyboard.KeySpace:
input += " " input += " "
case key == keyboard.KeyCtrlC: case keyboard.KeyCtrlC:
Logger.Info("Received stop command.") Logger.Info("Received stop command.")
close(stopChan) cancel()
return break loop
case char != 0:
input += string(char)
} }
Logger.currentInput = input // Update stored user input Logger.currentInput = input // Update stored user input
} }
}
wg.Wait()
} }

View File

@ -2,39 +2,40 @@ package net
import ( import (
"cimeyclust.com/steel/pkg/cmd" "cimeyclust.com/steel/pkg/cmd"
"context"
"fmt" "fmt"
"net" "net"
"sync"
"time" "time"
) )
// Start Starts the TCP server on the specified address. // Start Starts the TCP server on the specified address.
func Start(addr string, stopChan <-chan struct{}, wg *sync.WaitGroup) { func Run(baseCtx context.Context, addr string) {
defer wg.Done() ctx, cancel := context.WithCancel(baseCtx)
defer cancel()
// Start listening on the specified address // Start listening on the specified address
listener, err := net.Listen("tcp", addr) listener, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
cmd.Logger.Error("Error starting TCP server: %v") cmd.Logger.Error("Error starting TCP server: %v")
return
} }
// Close the listener when the application closes. // Close the listener when the application closes.
defer func(listener net.Listener) { defer listener.Close()
err := listener.Close()
if err != nil {
return
}
}(listener)
// slog.Info(fmt.Sprintf("Listening on %s", addr))
cmd.Logger.Info(fmt.Sprintf("Listening on %s", addr)) cmd.Logger.Info(fmt.Sprintf("Listening on %s", addr))
// var wg sync.WaitGroup
go func() { go func() {
go func() {
<-ctx.Done()
}()
for { for {
// Print test every second // Print test every second
select { select {
case <-stopChan: case <-ctx.Done():
return // Safe exit if stop has been signalled return
default: default:
cmd.Logger.Info("Test") cmd.Logger.Info("Test")
} }
@ -43,13 +44,18 @@ func Start(addr string, stopChan <-chan struct{}, wg *sync.WaitGroup) {
}() }()
// Listen for an incoming connection in a goroutine. // Listen for an incoming connection in a goroutine.
// wg.Add(1)
go func() { go func() {
go func() {
<-ctx.Done()
listener.Close()
}()
for { for {
conn, err := listener.Accept() conn, err := listener.Accept()
if err != nil { if err != nil {
select { select {
case <-stopChan: case <-ctx.Done():
return // Safe exit if stop has been signalled return
default: default:
cmd.Logger.Error(fmt.Sprintf("Error while accepting conn: %v", err)) cmd.Logger.Error(fmt.Sprintf("Error while accepting conn: %v", err))
} }
@ -61,8 +67,8 @@ func Start(addr string, stopChan <-chan struct{}, wg *sync.WaitGroup) {
} }
}() }()
// Block until we receive a stop signal // Wait for all components to finish
<-stopChan <-ctx.Done()
} }
// handleRequest handles incoming requests. // handleRequest handles incoming requests.

View File

@ -3,21 +3,34 @@ package main
import ( import (
"cimeyclust.com/steel/pkg/cmd" "cimeyclust.com/steel/pkg/cmd"
"cimeyclust.com/steel/pkg/net" "cimeyclust.com/steel/pkg/net"
"context"
"sync" "sync"
) )
func main() { func main() {
// Channel for stopping the program // Channel for stopping the program
stopChan := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup var wg sync.WaitGroup
// Start the console handler // Start the console handler
wg.Add(1) wg.Add(1)
go cmd.Start(&wg, stopChan) go func() {
defer wg.Done()
defer cancel()
cmd.Run(ctx)
}()
// Start the network server // Start the network server
wg.Add(1) wg.Add(1)
go net.Start("localhost:8080", stopChan, &wg) go func() {
defer wg.Done()
net.Run(ctx, "localhost:8080")
}()
cmd.Logger.Info("Started") cmd.Logger.Info("Started")