Added context.Context to stop and correctly stop metrics now
This commit is contained in:
@@ -64,8 +64,11 @@ func main() {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
<-sigs
|
<-sigs
|
||||||
log.Info().Msg("Stopping server")
|
log.Info().Msg("Stopping server")
|
||||||
server.Shutdown(context.Background())
|
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
|
||||||
pipeline.Stop()
|
defer cancel()
|
||||||
|
|
||||||
|
server.Shutdown(ctx)
|
||||||
|
pipeline.Stop(ctx)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if config.Server.Ssl.Enabled {
|
if config.Server.Ssl.Enabled {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package middleware
|
package middleware
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -11,7 +12,7 @@ import (
|
|||||||
|
|
||||||
type RequestLogger struct{}
|
type RequestLogger struct{}
|
||||||
|
|
||||||
func (_ *RequestLogger) Stop() {
|
func (_ *RequestLogger) Stop(ctx context.Context) {
|
||||||
log.Info().Msg("Stopped Logging")
|
log.Info().Msg("Stopped Logging")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package middleware
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"cmp"
|
"cmp"
|
||||||
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
@@ -17,7 +18,7 @@ type Metrics struct {
|
|||||||
endpointMetrics []EndpointMetrics
|
endpointMetrics []EndpointMetrics
|
||||||
ticker *time.Ticker
|
ticker *time.Ticker
|
||||||
file string
|
file string
|
||||||
stop chan bool
|
stop chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
type EndpointMetrics struct {
|
type EndpointMetrics struct {
|
||||||
@@ -133,12 +134,19 @@ func (m *Metrics) Flush() {
|
|||||||
log.Debug().Str("file", m.file).Int("count", len(a)).Msg("Completed Metrics flush")
|
log.Debug().Str("file", m.file).Int("count", len(a)).Msg("Completed Metrics flush")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Metrics) Stop() {
|
func (m *Metrics) Stop(ctx context.Context) {
|
||||||
log.Info().Msg("Stopping Request Metrics")
|
log.Info().Msg("Stopping Request Metrics")
|
||||||
for len(m.c) > 0 {
|
for len(m.c) > 0 {
|
||||||
rm := <- m.c
|
select {
|
||||||
|
case rm := <-m.c:
|
||||||
m.calculateDuration(rm)
|
m.calculateDuration(rm)
|
||||||
|
case <-ctx.Done():
|
||||||
|
m.stop <- struct{}{}
|
||||||
|
log.Warn().Msg("Hard Stopped Request Metrics")
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
m.Flush()
|
m.Flush()
|
||||||
|
m.stop <- struct{}{}
|
||||||
log.Info().Msg("Stopped Request Metrics")
|
log.Info().Msg("Stopped Request Metrics")
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package middleware
|
package middleware
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"slices"
|
"slices"
|
||||||
)
|
)
|
||||||
@@ -8,7 +9,7 @@ import (
|
|||||||
type Middleware interface {
|
type Middleware interface {
|
||||||
Use(http.Handler) http.Handler
|
Use(http.Handler) http.Handler
|
||||||
Manage()
|
Manage()
|
||||||
Stop()
|
Stop(context.Context)
|
||||||
}
|
}
|
||||||
|
|
||||||
type Pipeline struct {
|
type Pipeline struct {
|
||||||
@@ -33,9 +34,9 @@ func (p *Pipeline) Use() func(http.Handler) http.Handler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (p *Pipeline) Stop() {
|
func (p *Pipeline) Stop(ctx context.Context) {
|
||||||
for _, m := range p.middleware {
|
for _, m := range p.middleware {
|
||||||
m.Stop()
|
m.Stop(ctx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
package middleware
|
package middleware
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -37,7 +38,7 @@ func (l *Limiter) UpdateCleanupTime(new time.Duration) {
|
|||||||
l.cleanupTicker.Reset(new)
|
l.cleanupTicker.Reset(new)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (l *Limiter) Stop() {
|
func (l *Limiter) Stop(ctx context.Context) {
|
||||||
l.stop <- struct{}{}
|
l.stop <- struct{}{}
|
||||||
log.Info().Msg("Stopped Ratelimits")
|
log.Info().Msg("Stopped Ratelimits")
|
||||||
}
|
}
|
||||||
@@ -79,7 +80,7 @@ func (l *Limiter) Manage() {
|
|||||||
l.rwLock.Unlock()
|
l.rwLock.Unlock()
|
||||||
duration := time.Since(start)
|
duration := time.Since(start)
|
||||||
log.Debug().Str("duration", duration.String()).Int("deleted_buckets", deletedBuckets).Msg("Cleaned up Buckets")
|
log.Debug().Str("duration", duration.String()).Int("deleted_buckets", deletedBuckets).Msg("Cleaned up Buckets")
|
||||||
case <- l.stop:
|
case <-l.stop:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user