From 84a5674b14aa982b8e60c21d1706c8e81ec95d40 Mon Sep 17 00:00:00 2001 From: David Date: Wed, 2 Jul 2025 11:36:09 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=93=88=20Report=20traffic=20dropped=20met?= =?UTF-8?q?rics=20to=20LAPI=20(#223)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Initial implementation * fix * fixes * Fixes * xx * progress * xx * xx * xx * fix linter * Progress * Fixes * xx * xx * Remove trace logger * Last fix * fix lint * fix lint * fix lint --------- Co-authored-by: Max Lerebourg --- README.md | 24 +++-- bouncer.go | 160 ++++++++++++++++++++++++----- bouncer_test.go | 4 +- docker-compose.local.yml | 31 +++++- docker-compose.yml | 2 +- pkg/configuration/configuration.go | 17 ++- pkg/logger/logger.go | 40 +++++--- 7 files changed, 226 insertions(+), 52 deletions(-) diff --git a/README.md b/README.md index 7e7b0df..da74f1a 100644 --- a/README.md +++ b/README.md @@ -308,12 +308,18 @@ make run ### Note -**/!\ Cache is shared by all services** -_This means if an IP is banned, all services which are protected by an instance of the plugin will deny requests from that IP_ -Only one instance of the plugin is _possible_. +> [!IMPORTANT] +> Some of the behaviours and configuration parameters are shared globally across *all* crowdsec middlewares even if you declare different middlewares with different settings. +> +> **Cache is shared by all services**: This means if an IP is banned, all services which are protected by an instance of the plugin will deny requests from that IP +> +> If you define different caches for different middlewares, only the first one to be instantiated will be bound to the crowdsec stream. +> +> Overall, this middleware is designed in such a way that **only one instance of the plugin is *possible*.** You can have multiple crowdsec middlewares in the same cluster, the key parameters must be aligned (MetricsUpdateIntervalSeconds, CrowdsecMode, CrowdsecAppsecEnabled, etc.) -**/!\ Appsec maximum body limit is defaulted to 10MB** -_By careful when you upgrade to >1.4.x_ +> [!WARNING] +> **Appsec maximum body limit is defaulted to 10MB** +> *Be careful when you upgrade to >1.4.x* ### Variables @@ -324,11 +330,16 @@ _By careful when you upgrade to >1.4.x_ - LogLevel - string - default: `INFO`, expected values are: `INFO`, `DEBUG`, `ERROR` - - Log are written to `stdout` / `stderr` of file if LogFilePath is provided + - Log are written to `stdout` / `stderr` or file if LogFilePath is provided - LogFilePath - string - default: "" - File Path to write logs, must be writable by Traefik, Log rotation may require a restart of traefik +- MetricsUpdateIntervalSeconds + - int64 + - default: 600 + - Interval in seconds between metrics updates to Crowdsec + - If set to zero or less, metrics collection is disabled - CrowdsecMode - string - default: `live`, expected values are: `none`, `live`, `stream`, `alone`, `appsec` @@ -579,6 +590,7 @@ http: captchaGracePeriodSeconds: 1800 captchaHTMLFilePath: /captcha.html banHTMLFilePath: /ban.html + metricsUpdateIntervalSeconds: 600 ``` #### Fill variable with value of file diff --git a/bouncer.go b/bouncer.go index 10e4f65..1fc560e 100644 --- a/bouncer.go +++ b/bouncer.go @@ -12,7 +12,9 @@ import ( "io" "net/http" "net/url" + "strconv" "strings" + "sync/atomic" "text/template" "time" @@ -33,6 +35,7 @@ const ( crowdsecLapiHeader = "X-Api-Key" crowdsecLapiRoute = "v1/decisions" crowdsecLapiStreamRoute = "v1/decisions/stream" + crowdsecLapiMetricsRoute = "v1/usage-metrics" crowdsecCapiHost = "api.crowdsec.net" crowdsecCapiHeader = "Authorization" crowdsecCapiLoginRoute = "v2/watchers/login" @@ -40,12 +43,32 @@ const ( cacheTimeoutKey = "updated" ) +// ############################################################## +// Important: traefik creates an instance of the bouncer per route. +// We rely on globals (both here and in the memory cache) to share info between +// routes. This means that some of the plugins parameters will only work "once" +// and will take the values of the first middleware that was instantiated even +// if you have different middlewares with different parameters. This design +// makes it impossible to have multiple crowdsec implementations per cluster (unless you have multiple traefik deployments in it) +// - updateInterval +// - updateMaxFailure +// - defaultDecisionTimeout +// - redisUnreachableBlock +// - appsecEnabled +// - appsecHost +// - metricsUpdateIntervalSeconds +// - others... +// ################################### + //nolint:gochecknoglobals var ( isStartup = true isCrowdsecStreamHealthy = true - updateFailure = 0 - ticker chan bool + updateFailure int64 + streamTicker chan bool + metricsTicker chan bool + lastMetricsPush time.Time + blockedRequests int64 ) // CreateConfig creates the default plugin configuration. @@ -75,7 +98,7 @@ type Bouncer struct { crowdsecPassword string crowdsecScenarios []string updateInterval int64 - updateMaxFailure int + updateMaxFailure int64 defaultDecisionTimeout int64 remediationStatusCode int remediationCustomHeader string @@ -93,6 +116,8 @@ type Bouncer struct { } // New creates the crowdsec bouncer plugin. +// +//nolint:gocyclo func New(_ context.Context, next http.Handler, config *configuration.Config, name string) (http.Handler, error) { config.LogLevel = strings.ToUpper(config.LogLevel) log := logger.New(config.LogLevel, config.LogFilePath) @@ -225,7 +250,7 @@ func New(_ context.Context, next http.Handler, config *configuration.Config, nam return nil, err } - if (config.CrowdsecMode == configuration.StreamMode || config.CrowdsecMode == configuration.AloneMode) && ticker == nil { + if (config.CrowdsecMode == configuration.StreamMode || config.CrowdsecMode == configuration.AloneMode) && streamTicker == nil { if config.CrowdsecMode == configuration.AloneMode { if err := getToken(bouncer); err != nil { bouncer.log.Error("New:getToken " + err.Error()) @@ -234,10 +259,20 @@ func New(_ context.Context, next http.Handler, config *configuration.Config, nam } handleStreamTicker(bouncer) isStartup = false - ticker = startTicker(config, log, func() { + streamTicker = startTicker("stream", config.UpdateIntervalSeconds, log, func() { handleStreamTicker(bouncer) }) } + + // Start metrics ticker if not already running + if metricsTicker == nil && config.MetricsUpdateIntervalSeconds > 0 { + lastMetricsPush = time.Now() // Initialize lastMetricsPush when starting the metrics ticker + handleMetricsTicker(bouncer) + metricsTicker = startTicker("metrics", config.MetricsUpdateIntervalSeconds, log, func() { + handleMetricsTicker(bouncer) + }) + } + bouncer.log.Debug("New initialized mode:" + config.CrowdsecMode) return bouncer, nil @@ -353,6 +388,8 @@ type Login struct { // To append Headers we need to call rw.WriteHeader after set any header. func handleBanServeHTTP(bouncer *Bouncer, rw http.ResponseWriter) { + atomic.AddInt64(&blockedRequests, 1) + if bouncer.remediationCustomHeader != "" { rw.Header().Set(bouncer.remediationCustomHeader, "ban") } @@ -375,6 +412,7 @@ func handleRemediationServeHTTP(bouncer *Bouncer, remoteIP, remediation string, handleNextServeHTTP(bouncer, remoteIP, rw, req) return } + atomic.AddInt64(&blockedRequests, 1) // If we serve a captcha that should count as a dropped request. bouncer.captchaClient.ServeHTTP(rw, req, remoteIP) return } @@ -406,11 +444,17 @@ func handleStreamTicker(bouncer *Bouncer) { } } -func startTicker(config *configuration.Config, log *logger.Log, work func()) chan bool { - ticker := time.NewTicker(time.Duration(config.UpdateIntervalSeconds) * time.Second) +func handleMetricsTicker(bouncer *Bouncer) { + if err := reportMetrics(bouncer); err != nil { + bouncer.log.Error("handleMetricsTicker:reportMetrics " + err.Error()) + } +} + +func startTicker(name string, updateInterval int64, log *logger.Log, work func()) chan bool { + ticker := time.NewTicker(time.Duration(updateInterval) * time.Second) stop := make(chan bool, 1) go func() { - defer log.Debug("ticker:stopped") + defer log.Debug(name + "_ticker:stopped") for { select { case <-ticker.C: @@ -432,7 +476,7 @@ func handleNoStreamCache(bouncer *Bouncer, remoteIP string) (string, error) { Path: bouncer.crowdsecPath + crowdsecLapiRoute, RawQuery: fmt.Sprintf("ip=%v", remoteIP), } - body, err := crowdsecQuery(bouncer, routeURL.String(), false) + body, err := crowdsecQuery(bouncer, routeURL.String(), nil) if err != nil { return cache.BannedValue, err } @@ -491,7 +535,16 @@ func getToken(bouncer *Bouncer) error { Host: bouncer.crowdsecHost, Path: crowdsecCapiLoginRoute, } - body, err := crowdsecQuery(bouncer, loginURL.String(), true) + + // Move the login-specific payload here + loginData := []byte(fmt.Sprintf( + `{"machine_id": "%v","password": "%v","scenarios": ["%v"]}`, + bouncer.crowdsecMachineID, + bouncer.crowdsecPassword, + strings.Join(bouncer.crowdsecScenarios, `","`), + )) + + body, err := crowdsecQuery(bouncer, loginURL.String(), loginData) if err != nil { return err } @@ -528,7 +581,7 @@ func handleStreamCache(bouncer *Bouncer) error { Path: bouncer.crowdsecPath + bouncer.crowdsecStreamRoute, RawQuery: fmt.Sprintf("startup=%t", !isCrowdsecStreamHealthy || isStartup), } - body, err := crowdsecQuery(bouncer, streamRouteURL.String(), false) + body, err := crowdsecQuery(bouncer, streamRouteURL.String(), nil) if err != nil { return err } @@ -559,15 +612,9 @@ func handleStreamCache(bouncer *Bouncer) error { return nil } -func crowdsecQuery(bouncer *Bouncer, stringURL string, isPost bool) ([]byte, error) { +func crowdsecQuery(bouncer *Bouncer, stringURL string, data []byte) ([]byte, error) { var req *http.Request - if isPost { - data := []byte(fmt.Sprintf( - `{"machine_id": "%v","password": "%v","scenarios": ["%v"]}`, - bouncer.crowdsecMachineID, - bouncer.crowdsecPassword, - strings.Join(bouncer.crowdsecScenarios, `","`), - )) + if len(data) > 0 { req, _ = http.NewRequest(http.MethodPost, stringURL, bytes.NewBuffer(data)) } else { req, _ = http.NewRequest(http.MethodGet, stringURL, nil) @@ -588,13 +635,16 @@ func crowdsecQuery(bouncer *Bouncer, stringURL string, isPost bool) ([]byte, err if errToken := getToken(bouncer); errToken != nil { return nil, fmt.Errorf("crowdsecQuery:renewToken url:%s %w", stringURL, errToken) } - return crowdsecQuery(bouncer, stringURL, false) + return crowdsecQuery(bouncer, stringURL, nil) } - if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("crowdsecQuery url:%s, statusCode:%d", stringURL, res.StatusCode) - } - body, err := io.ReadAll(res.Body) + // Check if the status code starts with 2 + statusStr := strconv.Itoa(res.StatusCode) + if len(statusStr) < 1 || statusStr[0] != '2' { + return nil, fmt.Errorf("crowdsecQuery method:%s url:%s, statusCode:%d (expected: 2xx)", req.Method, stringURL, res.StatusCode) + } + + body, err := io.ReadAll(res.Body) if err != nil { return nil, fmt.Errorf("crowdsecQuery:readBody %w", err) } @@ -664,3 +714,65 @@ func appsecQuery(bouncer *Bouncer, ip string, httpReq *http.Request) error { } return nil } + +func reportMetrics(bouncer *Bouncer) error { + now := time.Now() + currentCount := atomic.LoadInt64(&blockedRequests) + windowSizeSeconds := int(now.Sub(lastMetricsPush).Seconds()) + + bouncer.log.Debug(fmt.Sprintf("reportMetrics: blocked_requests=%d window_size=%ds", currentCount, windowSizeSeconds)) + + metrics := map[string]interface{}{ + "remediation_components": []map[string]interface{}{ + { + "version": "1.X.X", + "type": "bouncer", + "name": "traefik_plugin", + "metrics": []map[string]interface{}{ + { + "items": []map[string]interface{}{ + { + "name": "dropped", + "value": currentCount, + "unit": "request", + "labels": map[string]string{ + "type": "traefik_plugin", + }, + }, + }, + "meta": map[string]interface{}{ + "window_size_seconds": windowSizeSeconds, + "utc_now_timestamp": now.Unix(), + }, + }, + }, + "utc_startup_timestamp": time.Now().Unix(), + "feature_flags": []string{}, + "os": map[string]string{ + "name": "unknown", + "version": "unknown", + }, + }, + }, + } + + data, err := json.Marshal(metrics) + if err != nil { + return fmt.Errorf("reportMetrics:marshal %w", err) + } + + metricsURL := url.URL{ + Scheme: bouncer.crowdsecScheme, + Host: bouncer.crowdsecHost, + Path: bouncer.crowdsecPath + crowdsecLapiMetricsRoute, + } + + _, err = crowdsecQuery(bouncer, metricsURL.String(), data) + if err != nil { + return fmt.Errorf("reportMetrics:query %w", err) + } + + atomic.StoreInt64(&blockedRequests, 0) + lastMetricsPush = now + return nil +} diff --git a/bouncer_test.go b/bouncer_test.go index 270a5f5..9499506 100644 --- a/bouncer_test.go +++ b/bouncer_test.go @@ -163,7 +163,7 @@ func Test_crowdsecQuery(t *testing.T) { type args struct { bouncer *Bouncer stringURL string - isPost bool + data []byte } tests := []struct { name string @@ -175,7 +175,7 @@ func Test_crowdsecQuery(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := crowdsecQuery(tt.args.bouncer, tt.args.stringURL, tt.args.isPost) + got, err := crowdsecQuery(tt.args.bouncer, tt.args.stringURL, tt.args.data) if (err != nil) != tt.wantErr { t.Errorf("crowdsecQuery() error = %v, wantErr %v", err, tt.wantErr) return diff --git a/docker-compose.local.yml b/docker-compose.local.yml index 6812bb0..040496a 100644 --- a/docker-compose.local.yml +++ b/docker-compose.local.yml @@ -36,7 +36,7 @@ services: - "traefik.http.routers.router-foo.middlewares=crowdsec@docker" - "traefik.http.services.service-foo.loadbalancer.server.port=80" - whoami2: + bar: image: traefik/whoami container_name: "simple-service-bar" restart: unless-stopped @@ -48,12 +48,38 @@ services: - "traefik.http.services.service-bar.loadbalancer.server.port=80" - "traefik.http.middlewares.crowdsec.plugin.bouncer.enabled=true" - "traefik.http.middlewares.crowdsec.plugin.bouncer.loglevel=DEBUG" + - "traefik.http.middlewares.crowdsec.plugin.bouncer.metricsupdateintervalseconds=15" - "traefik.http.middlewares.crowdsec.plugin.bouncer.crowdsecappsecenabled=true" - "traefik.http.middlewares.crowdsec.plugin.bouncer.crowdsecmode=stream" - "traefik.http.middlewares.crowdsec.plugin.bouncer.crowdseclapikey=40796d93c2958f9e58345514e67740e5=" + bar2: + image: traefik/whoami + container_name: "simple-service-bar2" + restart: unless-stopped + labels: + - "traefik.enable=true" + - "traefik.http.routers.router-bar2.rule=PathPrefix(`/bar2`)" + - "traefik.http.routers.router-bar2.entrypoints=web" + - "traefik.http.routers.router-bar2.middlewares=crowdsec2@docker" + - "traefik.http.services.service-bar2.loadbalancer.server.port=80" + - "traefik.http.middlewares.crowdsec2.plugin.bouncer.enabled=true" + - "traefik.http.middlewares.crowdsec2.plugin.bouncer.loglevel=DEBUG" + - "traefik.http.middlewares.crowdsec2.plugin.bouncer.crowdsecmode=stream" + - "traefik.http.middlewares.crowdsec2.plugin.bouncer.updateintervalseconds=10" + - "traefik.http.middlewares.crowdsec2.plugin.bouncer.updatemaxfailure=-1" + - "traefik.http.middlewares.crowdsec2.plugin.bouncer.crowdseclapikey=40796d93c2958f9e58345514e67740e5=" + bar3: + image: traefik/whoami + container_name: "simple-service-bar3" + restart: unless-stopped + labels: + - "traefik.enable=true" + - "traefik.http.routers.router-bar3.rule=PathPrefix(`/bar3`)" + - "traefik.http.routers.router-bar3.entrypoints=web" + - "traefik.http.routers.router-bar3.middlewares=crowdsec2@docker" crowdsec: - image: crowdsecurity/crowdsec:v1.6.1-2 + image: crowdsecurity/crowdsec:v1.6.8 container_name: "crowdsec" restart: unless-stopped environment: @@ -67,7 +93,6 @@ services: - crowdsec-config-local:/etc/crowdsec/ labels: - "traefik.enable=false" - volumes: logs-local: crowdsec-db-local: diff --git a/docker-compose.yml b/docker-compose.yml index 8c5d952..8db2de8 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -59,7 +59,7 @@ services: - "traefik.http.middlewares.crowdsec.plugin.bouncer.forwardedheaderstrustedips=172.21.0.5" crowdsec: - image: crowdsecurity/crowdsec:v1.6.1-2 + image: crowdsecurity/crowdsec:v1.6.8 container_name: "crowdsec" restart: unless-stopped environment: diff --git a/pkg/configuration/configuration.go b/pkg/configuration/configuration.go index e6951c6..09d72ff 100644 --- a/pkg/configuration/configuration.go +++ b/pkg/configuration/configuration.go @@ -66,7 +66,8 @@ type Config struct { CrowdsecCapiPasswordFile string `json:"crowdsecCapiPasswordFile,omitempty"` CrowdsecCapiScenarios []string `json:"crowdsecCapiScenarios,omitempty"` UpdateIntervalSeconds int64 `json:"updateIntervalSeconds,omitempty"` - UpdateMaxFailure int `json:"updateMaxFailure,omitempty"` + MetricsUpdateIntervalSeconds int64 `json:"metricsUpdateIntervalSeconds,omitempty"` + UpdateMaxFailure int64 `json:"updateMaxFailure,omitempty"` DefaultDecisionSeconds int64 `json:"defaultDecisionSeconds,omitempty"` RemediationStatusCode int `json:"remediationStatusCode,omitempty"` HTTPTimeoutSeconds int64 `json:"httpTimeoutSeconds,omitempty"` @@ -118,6 +119,7 @@ func New() *Config { CrowdsecLapiKey: "", CrowdsecLapiTLSInsecureVerify: false, UpdateIntervalSeconds: 60, + MetricsUpdateIntervalSeconds: 600, UpdateMaxFailure: 0, DefaultDecisionSeconds: 60, RemediationStatusCode: http.StatusForbidden, @@ -340,13 +342,22 @@ func validateParamsRequired(config *Config) error { return fmt.Errorf("%v: cannot be empty", key) } } - requiredInt := map[string]int64{ + requiredInt0 := map[string]int64{ + "CrowdsecAppsecBodyLimit": config.CrowdsecAppsecBodyLimit, + "MetricsUpdateIntervalSeconds": config.MetricsUpdateIntervalSeconds, + } + for key, val := range requiredInt0 { + if val < 0 { + return fmt.Errorf("%v: cannot be less than 0", key) + } + } + requiredInt1 := map[string]int64{ "UpdateIntervalSeconds": config.UpdateIntervalSeconds, "DefaultDecisionSeconds": config.DefaultDecisionSeconds, "HTTPTimeoutSeconds": config.HTTPTimeoutSeconds, "CaptchaGracePeriodSeconds": config.CaptchaGracePeriodSeconds, } - for key, val := range requiredInt { + for key, val := range requiredInt1 { if val < 1 { return fmt.Errorf("%v: cannot be less than 1", key) } diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 6f74894..a66cd19 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -1,5 +1,5 @@ // Package logger implements utility routines to write to stdout and stderr. -// It supports debug, info and error level +// It supports trace, debug, info and error level package logger import ( @@ -19,29 +19,43 @@ type Log struct { // New Set Default log level to info in case log level to defined. func New(logLevel string, logFilePath string) *Log { + // Initialize loggers with discard output logError := log.New(io.Discard, "ERROR: CrowdsecBouncerTraefikPlugin: ", log.Ldate|log.Ltime) logInfo := log.New(io.Discard, "INFO: CrowdsecBouncerTraefikPlugin: ", log.Ldate|log.Ltime) logDebug := log.New(io.Discard, "DEBUG: CrowdsecBouncerTraefikPlugin: ", log.Ldate|log.Ltime) - logError.SetOutput(os.Stderr) - logInfo.SetOutput(os.Stdout) // we initialize logger to STDOUT/STDERR first so if the file logger cannot be initialized we can inform the user - if logLevel == "DEBUG" { - logDebug.SetOutput(os.Stdout) - } + output := os.Stdout + errorOutput := os.Stderr + + // prepare file logging if specified if logFilePath != "" { logFile, err := os.OpenFile(filepath.Clean(logFilePath), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0600) - if err != nil { - _ = fmt.Errorf("LogFilePath is not writable %w", err) + if err == nil { + output = logFile + errorOutput = logFile } else { - logInfo.SetOutput(logFile) - logError.SetOutput(logFile) - if logLevel == "DEBUG" { - logDebug.SetOutput(logFile) - } + _ = fmt.Errorf("LogFilePath is not writable %w", err) } } + // Set error logger output + logError.SetOutput(errorOutput) + + // Configure log levels + switch logLevel { + case "ERROR": + // Only error logging is enabled + case "INFO": + logInfo.SetOutput(output) + case "DEBUG": + logInfo.SetOutput(output) + logDebug.SetOutput(output) + default: + // Default to INFO level + logInfo.SetOutput(output) + } + return &Log{ logError: logError, logInfo: logInfo,