Added Logging and fault tolerancy to Server

This commit is contained in:
Pablu23
2023-11-28 15:50:11 +01:00
parent d812180ce0
commit 0e046d9deb
3 changed files with 191 additions and 42 deletions

View File

@@ -6,10 +6,14 @@ import (
"crypto/rsa"
"encoding/hex"
"errors"
"fmt"
"io"
"net"
"os"
"os/signal"
"sync"
"time"
log "github.com/sirupsen/logrus"
)
type info struct {
@@ -17,10 +21,12 @@ type info struct {
lastSync uint32
lastPckSend common.HeaderFlag
key [32]byte
time time.Time
}
type Server struct {
sessions map[common.SessionID]*info
mu sync.Mutex
rsa *rsa.PrivateKey
}
@@ -38,13 +44,22 @@ func New() (*Server, error) {
}
func (server *Server) sendPacket(conn *net.UDPConn, addr *net.UDPAddr, pck *common.Packet) {
key := server.sessions[pck.Sid].key
// fmt.Printf("Sending Packet, Sync: %v, Type: %v\n", pck.Sync, pck.Flag)
server.mu.Lock()
var key [32]byte
if info, ok := server.sessions[pck.Sid]; ok {
key = info.key
server.sessions[pck.Sid].time = time.Now()
server.mu.Unlock()
} else {
log.WithField("SessionID", hex.EncodeToString(pck.Sid[:])).Warn("Invalid Session")
server.mu.Unlock()
return
}
secPck := common.NewSymetricSecurePacket(key, pck)
if _, err := conn.WriteToUDP(secPck.ToBytes(), addr); err != nil {
panic(err)
log.Error("Could not write Packet to UDP")
return
}
}
@@ -58,19 +73,35 @@ func (server *Server) handlePacket(conn *net.UDPConn, addr *net.UDPAddr, rPacket
break
case common.Resend:
server.resend(conn, addr, rPacket)
break
default:
log.WithField("Packet Type", rPacket.Flag).Error("Unexpected Packet Type")
break
}
}
func (server *Server) resend(conn *net.UDPConn, addr *net.UDPAddr, pck *common.Packet) {
resend, err := pck.GetUint32Payload()
if err != nil {
panic(err)
log.Error("Error getting Resend Sync from Packet")
return
}
server.mu.Lock()
var path string
if info, ok := server.sessions[pck.Sid]; ok {
path = info.path
server.mu.Unlock()
} else {
log.WithField("SessionID", hex.EncodeToString(pck.Sid[:])).Warn("Invalid Session")
server.mu.Unlock()
return
}
path := server.sessions[pck.Sid].path
file, err := os.Open(path)
if err != nil {
panic(err)
log.WithError(err).WithField("File Path", path).Error("Unable to open File")
return
}
defer file.Close()
@@ -80,7 +111,8 @@ func (server *Server) resend(conn *net.UDPConn, addr *net.UDPAddr, pck *common.P
_, err = file.ReadAt(buf, offset)
if err != nil && !errors.Is(err, io.EOF) {
panic(err)
log.WithError(err).WithField("File Path", path).Error("Unable to read File")
return
}
resendPck := common.NewResendFile(pck, buf)
@@ -90,34 +122,47 @@ func (server *Server) resend(conn *net.UDPConn, addr *net.UDPAddr, pck *common.P
func (server *Server) handleAck(conn *net.UDPConn, addr *net.UDPAddr, pck *common.Packet) {
ack, err := pck.GetUint32Payload()
if err != nil {
panic(err)
}
session := server.sessions[pck.Sid]
if session == nil {
panic(err)
}
if ack != session.lastSync {
fmt.Printf("Wrong Ack %v, expected %v\n", ack, session.lastSync)
log.WithError(err).Error("Getting Acknowledge from Packet")
return
}
if session.lastPckSend == common.End {
fmt.Printf("Deleting Session %v\n", hex.EncodeToString(pck.Sid[:]))
delete(server.sessions, pck.Sid)
server.mu.Lock()
if session, ok := server.sessions[pck.Sid]; ok {
if ack != session.lastSync {
log.WithFields(log.Fields{
"Expected": session.lastSync,
"Received": ack,
}).Warn("Received wrong Acknowledge")
return
}
if session.lastPckSend == common.End {
log.WithField("SessionID", hex.EncodeToString(pck.Sid[:])).Info("Closing Session")
delete(server.sessions, pck.Sid)
server.mu.Unlock()
} else {
server.mu.Unlock()
server.sendData(conn, addr, pck)
}
} else {
server.sendData(conn, addr, pck)
log.WithField("SessionID", hex.EncodeToString(pck.Sid[:])).Warn("Invalid Session")
server.mu.Unlock()
return
}
}
func (server *Server) sendPTE(conn *net.UDPConn, addr *net.UDPAddr, pck *common.Packet) {
path, err := pck.GetFilePath()
if err != nil {
panic(err)
log.WithError(err).Error("Unable to get File Path")
return
}
fi, err := os.Stat(path)
if err != nil {
panic(err)
log.WithError(err).WithField("File Path", path).Error("Unable to open File")
return
}
fileSize := fi.Size()
@@ -125,16 +170,36 @@ func (server *Server) sendPTE(conn *net.UDPConn, addr *net.UDPAddr, pck *common.
ptePck := common.NewPte(uint32(fileSize), pck)
server.sendPacket(conn, addr, ptePck)
server.sessions[pck.Sid].path = path
server.sessions[pck.Sid].lastSync = ptePck.Sync
server.sessions[pck.Sid].lastPckSend = ptePck.Flag
server.mu.Lock()
if info, ok := server.sessions[pck.Sid]; ok {
info.path = path
info.lastSync = ptePck.Sync
info.lastPckSend = ptePck.Flag
server.mu.Unlock()
} else {
log.WithField("SessionID", hex.EncodeToString(pck.Sid[:])).Warn("Invalid Session")
server.mu.Unlock()
return
}
}
func (server *Server) sendData(conn *net.UDPConn, addr *net.UDPAddr, pck *common.Packet) {
path := server.sessions[pck.Sid].path
var path string
server.mu.Lock()
if info, ok := server.sessions[pck.Sid]; ok {
path = info.path
server.mu.Unlock()
} else {
log.WithField("SessionID", hex.EncodeToString(pck.Sid[:])).Warn("Invalid Session")
server.mu.Unlock()
return
}
file, err := os.Open(path)
if err != nil {
panic(err)
log.WithError(err).WithField("File Path", path).Error("Unable to open File")
return
}
defer file.Close()
@@ -143,7 +208,8 @@ func (server *Server) sendData(conn *net.UDPConn, addr *net.UDPAddr, pck *common
for {
r, err := file.Read(buf)
if err != nil && !errors.Is(err, io.EOF) {
panic(err)
log.WithError(err).WithField("File Path", path).Error("Unable to read File")
return
}
if r == 0 {
break
@@ -154,49 +220,117 @@ func (server *Server) sendData(conn *net.UDPConn, addr *net.UDPAddr, pck *common
}
eodPck := common.NewEnd(filePck)
server.sessions[pck.Sid].lastSync = eodPck.Sync
server.sessions[pck.Sid].lastPckSend = eodPck.Flag
server.mu.Lock()
if info, ok := server.sessions[pck.Sid]; ok {
info.lastSync = eodPck.Sync
info.lastPckSend = eodPck.Flag
server.mu.Unlock()
} else {
log.WithField("SessionID", hex.EncodeToString(pck.Sid[:])).Warn("Invalid Session")
server.mu.Unlock()
return
}
server.sendPacket(conn, addr, eodPck)
}
func (server *Server) startTimeout(interuptChan chan bool) {
running := true
for running {
select {
case c := <-interuptChan:
if c {
running = false
}
break
case <-time.After(time.Second * 30):
server.cleanup()
break
}
}
}
func (server *Server) cleanup() {
server.mu.Lock()
for sid, info := range server.sessions {
if time.Now().After(info.time.Add(30 * time.Second)) {
delete(server.sessions, sid)
log.WithField("SessionID", hex.EncodeToString(sid[:])).Info("Closed session")
}
}
server.mu.Unlock()
}
func (server *Server) handleShutdown(stop chan bool) {
c := make(chan os.Signal, 1)
signal.Notify(c, os.Interrupt)
go func() {
for range c {
stop <- true
log.Info("Server is shutting down")
os.Exit(0)
}
}()
}
func (server *Server) Serve() {
udpAddr, err := net.ResolveUDPAddr("udp", "0.0.0.0:13374")
if err != nil {
fmt.Println(err)
os.Exit(1)
log.Fatal("Could not resolve UDP Address")
}
log.Infof("Starting server on %v:%v", udpAddr.IP, udpAddr.Port)
conn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
fmt.Println(err)
os.Exit(1)
log.Fatal("Could not start listening")
}
log.Info("Started listening")
c := make(chan bool)
server.handleShutdown(c)
go server.startTimeout(c)
for {
var buf [common.PacketSize]byte
_, addr, err := conn.ReadFromUDP(buf[0:])
if err != nil {
fmt.Println(err)
return
log.Error("Could not retrieve UDP Packet")
continue
}
secPck := common.SecurePacketFromBytes(buf[:])
if secPck.IsRsa == 0 {
key := server.sessions[secPck.Sid].key
var key [32]byte
server.mu.Lock()
if info, ok := server.sessions[secPck.Sid]; ok {
key = info.key
} else {
log.WithField("SessionID", hex.EncodeToString(secPck.Sid[:])).Warn("Invalid Session")
server.mu.Unlock()
continue
}
server.mu.Unlock()
pck, err := secPck.ExtractPacket(key)
if err != nil {
fmt.Println(err)
log.Error("Could not extract Packet from Secure Packet")
}
go server.handlePacket(conn, addr, &pck)
} else {
key := secPck.ExtractKey()
fmt.Printf("Session: %v, Key: %v\n", hex.EncodeToString(secPck.Sid[:]), hex.EncodeToString(key))
log.WithField("SessionID", hex.EncodeToString(secPck.Sid[:])).Info("New Session")
server.sessions[secPck.Sid] = &info{
key: [32]byte(key),
}
}
}
}