| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420 |
- // Unless explicitly stated otherwise all files in this repository are licensed
- // under the Apache License Version 2.0.
- // This product includes software developed at Datadog (https://www.datadoghq.com/).
- // Copyright 2016 Datadog, Inc.
- //go:build appsec
- // +build appsec
- package appsec
- import (
- "encoding/json"
- "errors"
- "fmt"
- "sort"
- "sync"
- "sync/atomic"
- "time"
- "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo/instrumentation/grpcsec"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo/instrumentation/httpsec"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/waf"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"
- )
- const (
- eventRulesVersionTag = "_dd.appsec.event_rules.version"
- eventRulesErrorsTag = "_dd.appsec.event_rules.errors"
- eventRulesLoadedTag = "_dd.appsec.event_rules.loaded"
- eventRulesFailedTag = "_dd.appsec.event_rules.error_count"
- wafDurationTag = "_dd.appsec.waf.duration"
- wafDurationExtTag = "_dd.appsec.waf.duration_ext"
- wafTimeoutTag = "_dd.appsec.waf.timeouts"
- wafVersionTag = "_dd.appsec.waf.version"
- )
- // Register the WAF event listener.
- func (a *appsec) registerWAF() (unreg dyngo.UnregisterFunc, err error) {
- // Check the WAF is healthy
- if err := waf.Health(); err != nil {
- return nil, err
- }
- // Instantiate the WAF
- waf, err := waf.NewHandle(a.cfg.rules, a.cfg.obfuscator.KeyRegex, a.cfg.obfuscator.ValueRegex)
- if err != nil {
- return nil, err
- }
- // Close the WAF in case of an error in what's following
- defer func() {
- if err != nil {
- waf.Close()
- }
- }()
- // Check if there are addresses in the rule
- ruleAddresses := waf.Addresses()
- if len(ruleAddresses) == 0 {
- return nil, errors.New("no addresses found in the rule")
- }
- // Check there are supported addresses in the rule
- httpAddresses, grpcAddresses, notSupported := supportedAddresses(ruleAddresses)
- if len(httpAddresses) == 0 && len(grpcAddresses) == 0 {
- return nil, fmt.Errorf("the addresses present in the rule are not supported: %v", notSupported)
- } else if len(notSupported) > 0 {
- log.Debug("appsec: the addresses present in the rule are partially supported: not supported=%v", notSupported)
- }
- // Register the WAF event listener
- var unregisterHTTP, unregisterGRPC dyngo.UnregisterFunc
- if len(httpAddresses) > 0 {
- log.Debug("appsec: registering http waf listening to addresses %v", httpAddresses)
- unregisterHTTP = dyngo.Register(newHTTPWAFEventListener(waf, httpAddresses, a.cfg.wafTimeout, a.limiter))
- }
- if len(grpcAddresses) > 0 {
- log.Debug("appsec: registering grpc waf listening to addresses %v", grpcAddresses)
- unregisterGRPC = dyngo.Register(newGRPCWAFEventListener(waf, grpcAddresses, a.cfg.wafTimeout, a.limiter))
- }
- if err := a.enableRCBlocking(wafHandleWrapper{waf}); err != nil {
- log.Error("appsec: Remote config: cannot enable blocking, rules data won't be updated: %v", err)
- }
- // Return an unregistration function that will also release the WAF instance.
- return func() {
- defer waf.Close()
- if unregisterHTTP != nil {
- unregisterHTTP()
- }
- if unregisterGRPC != nil {
- unregisterGRPC()
- }
- }, nil
- }
- // newWAFEventListener returns the WAF event listener to register in order to enable it.
- func newHTTPWAFEventListener(handle *waf.Handle, addresses []string, timeout time.Duration, limiter Limiter) dyngo.EventListener {
- var monitorRulesOnce sync.Once // per instantiation
- actionHandler := httpsec.NewActionsHandler()
- return httpsec.OnHandlerOperationStart(func(op *httpsec.Operation, args httpsec.HandlerOperationArgs) {
- var body interface{}
- wafCtx := waf.NewContext(handle)
- if wafCtx == nil {
- // The WAF event listener got concurrently released
- return
- }
- values := map[string]interface{}{}
- for _, addr := range addresses {
- if addr == httpClientIPAddr && args.ClientIP.IsValid() {
- values[httpClientIPAddr] = args.ClientIP.String()
- }
- }
- // TODO: suspicious request blocking by moving here all the addresses available when the request begins
- matches, actionIds := runWAF(wafCtx, values, timeout)
- if len(matches) > 0 {
- interrupt := false
- for _, id := range actionIds {
- interrupt = actionHandler.Apply(id, op) || interrupt
- }
- op.AddSecurityEvents(matches)
- log.Debug("appsec: WAF detected an attack before executing the request")
- if interrupt {
- wafCtx.Close()
- return
- }
- }
- op.On(httpsec.OnSDKBodyOperationStart(func(op *httpsec.SDKBodyOperation, args httpsec.SDKBodyOperationArgs) {
- body = args.Body
- }))
- // At the moment, AppSec doesn't block the requests, and so we can use the fact we are in monitoring-only mode
- // to call the WAF only once at the end of the handler operation.
- op.On(httpsec.OnHandlerOperationFinish(func(op *httpsec.Operation, res httpsec.HandlerOperationRes) {
- defer wafCtx.Close()
- // Run the WAF on the rule addresses available in the request args
- values := make(map[string]interface{}, len(addresses))
- for _, addr := range addresses {
- switch addr {
- case serverRequestRawURIAddr:
- values[serverRequestRawURIAddr] = args.RequestURI
- case serverRequestHeadersNoCookiesAddr:
- if headers := args.Headers; headers != nil {
- values[serverRequestHeadersNoCookiesAddr] = headers
- }
- case serverRequestCookiesAddr:
- if cookies := args.Cookies; cookies != nil {
- values[serverRequestCookiesAddr] = cookies
- }
- case serverRequestQueryAddr:
- if query := args.Query; query != nil {
- values[serverRequestQueryAddr] = query
- }
- case serverRequestPathParamsAddr:
- if pathParams := args.PathParams; pathParams != nil {
- values[serverRequestPathParamsAddr] = pathParams
- }
- case serverRequestBodyAddr:
- if body != nil {
- values[serverRequestBodyAddr] = body
- }
- case serverResponseStatusAddr:
- values[serverResponseStatusAddr] = res.Status
- }
- }
- // Run the WAF, ignoring the returned actions - if any - since blocking after the request handler's
- // response is not supported at the moment.
- matches, _ := runWAF(wafCtx, values, timeout)
- // Add WAF metrics.
- rInfo := handle.RulesetInfo()
- overallRuntimeNs, internalRuntimeNs := wafCtx.TotalRuntime()
- addWAFMonitoringTags(op, rInfo.Version, overallRuntimeNs, internalRuntimeNs, wafCtx.TotalTimeouts())
- // Add the following metrics once per instantiation of a WAF handle
- monitorRulesOnce.Do(func() {
- addRulesMonitoringTags(op, rInfo)
- op.AddTag(ext.ManualKeep, samplernames.AppSec)
- })
- // Log the attacks if any
- if len(matches) == 0 {
- return
- }
- log.Debug("appsec: attack detected by the waf")
- if limiter.Allow() {
- op.AddSecurityEvents(matches)
- }
- }))
- })
- }
- // newGRPCWAFEventListener returns the WAF event listener to register in order
- // to enable it.
- func newGRPCWAFEventListener(handle *waf.Handle, addresses []string, timeout time.Duration, limiter Limiter) dyngo.EventListener {
- var monitorRulesOnce sync.Once // per instantiation
- actionHandler := grpcsec.NewActionsHandler()
- return grpcsec.OnHandlerOperationStart(func(op *grpcsec.HandlerOperation, handlerArgs grpcsec.HandlerOperationArgs) {
- // Limit the maximum number of security events, as a streaming RPC could
- // receive unlimited number of messages where we could find security events
- const maxWAFEventsPerRequest = 10
- var (
- nbEvents uint32
- logOnce sync.Once // per request
- overallRuntimeNs waf.AtomicU64
- internalRuntimeNs waf.AtomicU64
- nbTimeouts waf.AtomicU64
- events []json.RawMessage
- mu sync.Mutex // events mutex
- )
- wafCtx := waf.NewContext(handle)
- if wafCtx == nil {
- // The WAF event listener got concurrently released
- return
- }
- defer wafCtx.Close()
- // The same address is used for gRPC and http when it comes to client ip
- values := map[string]interface{}{}
- for _, addr := range addresses {
- if addr == httpClientIPAddr && handlerArgs.ClientIP.IsValid() {
- values[httpClientIPAddr] = handlerArgs.ClientIP.String()
- }
- }
- matches, actionIds := runWAF(wafCtx, values, timeout)
- if len(matches) > 0 {
- interrupt := false
- for _, id := range actionIds {
- interrupt = actionHandler.Apply(id, op) || interrupt
- }
- op.AddSecurityEvents(matches)
- log.Debug("appsec: WAF detected an attack before executing the request")
- if interrupt {
- return
- }
- }
- op.On(grpcsec.OnReceiveOperationFinish(func(_ grpcsec.ReceiveOperation, res grpcsec.ReceiveOperationRes) {
- if atomic.LoadUint32(&nbEvents) == maxWAFEventsPerRequest {
- logOnce.Do(func() {
- log.Debug("appsec: ignoring the rpc message due to the maximum number of security events per grpc call reached")
- })
- return
- }
- // The current workaround of the WAF context limitations is to
- // simply instantiate and release the WAF context for the operation
- // lifetime so that:
- // 1. We avoid growing the memory usage of the context every time
- // a grpc.server.request.message value is added to it during
- // the RPC lifetime.
- // 2. We avoid the limitation of 1 event per attack type.
- // TODO(Julio-Guerra): a future libddwaf API should solve this out.
- wafCtx := waf.NewContext(handle)
- if wafCtx == nil {
- // The WAF event listener got concurrently released
- return
- }
- defer wafCtx.Close()
- // Run the WAF on the rule addresses available in the args
- // Note that we don't check if the address is present in the rules
- // as we only support one at the moment, so this callback cannot be
- // set when the address is not present.
- values := map[string]interface{}{grpcServerRequestMessage: res.Message}
- if md := handlerArgs.Metadata; len(md) > 0 {
- values[grpcServerRequestMetadata] = md
- }
- // Run the WAF, ignoring the returned actions - if any - since blocking after the request handler's
- // response is not supported at the moment.
- event, _ := runWAF(wafCtx, values, timeout)
- // WAF run durations are WAF context bound. As of now we need to keep track of those externally since
- // we use a new WAF context for each callback. When we are able to re-use the same WAF context across
- // callbacks, we can get rid of these variables and simply use the WAF bindings in OnHandlerOperationFinish.
- overall, internal := wafCtx.TotalRuntime()
- overallRuntimeNs.Add(overall)
- internalRuntimeNs.Add(internal)
- nbTimeouts.Add(wafCtx.TotalTimeouts())
- if len(event) == 0 {
- return
- }
- log.Debug("appsec: attack detected by the grpc waf")
- atomic.AddUint32(&nbEvents, 1)
- mu.Lock()
- events = append(events, event)
- mu.Unlock()
- }))
- op.On(grpcsec.OnHandlerOperationFinish(func(op *grpcsec.HandlerOperation, _ grpcsec.HandlerOperationRes) {
- rInfo := handle.RulesetInfo()
- addWAFMonitoringTags(op, rInfo.Version, overallRuntimeNs.Load(), internalRuntimeNs.Load(), nbTimeouts.Load())
- // Log the following metrics once per instantiation of a WAF handle
- monitorRulesOnce.Do(func() {
- addRulesMonitoringTags(op, rInfo)
- op.AddTag(ext.ManualKeep, samplernames.AppSec)
- })
- // Log the events if any
- if len(events) > 0 && limiter.Allow() {
- op.AddSecurityEvents(events...)
- }
- }))
- })
- }
- func runWAF(wafCtx *waf.Context, values map[string]interface{}, timeout time.Duration) ([]byte, []string) {
- matches, actions, err := wafCtx.Run(values, timeout)
- if err != nil {
- if err == waf.ErrTimeout {
- log.Debug("appsec: waf timeout value of %s reached", timeout)
- } else {
- log.Error("appsec: unexpected waf error: %v", err)
- return nil, nil
- }
- }
- return matches, actions
- }
- // HTTP rule addresses currently supported by the WAF
- const (
- serverRequestRawURIAddr = "server.request.uri.raw"
- serverRequestHeadersNoCookiesAddr = "server.request.headers.no_cookies"
- serverRequestCookiesAddr = "server.request.cookies"
- serverRequestQueryAddr = "server.request.query"
- serverRequestPathParamsAddr = "server.request.path_params"
- serverRequestBodyAddr = "server.request.body"
- serverResponseStatusAddr = "server.response.status"
- httpClientIPAddr = "http.client_ip"
- )
- // List of HTTP rule addresses currently supported by the WAF
- var httpAddresses = []string{
- serverRequestRawURIAddr,
- serverRequestHeadersNoCookiesAddr,
- serverRequestCookiesAddr,
- serverRequestQueryAddr,
- serverRequestPathParamsAddr,
- serverRequestBodyAddr,
- serverResponseStatusAddr,
- httpClientIPAddr,
- }
- // gRPC rule addresses currently supported by the WAF
- const (
- grpcServerRequestMessage = "grpc.server.request.message"
- grpcServerRequestMetadata = "grpc.server.request.metadata"
- )
- // List of gRPC rule addresses currently supported by the WAF
- var grpcAddresses = []string{
- grpcServerRequestMessage,
- grpcServerRequestMetadata,
- httpClientIPAddr,
- }
- func init() {
- // sort the address lists to avoid mistakes and use sort.SearchStrings()
- sort.Strings(httpAddresses)
- sort.Strings(grpcAddresses)
- }
- // supportedAddresses returns the list of addresses we actually support from the
- // given rule addresses.
- func supportedAddresses(ruleAddresses []string) (supportedHTTP, supportedGRPC, notSupported []string) {
- // Filter the supported addresses only
- for _, addr := range ruleAddresses {
- supported := false
- if i := sort.SearchStrings(httpAddresses, addr); i < len(httpAddresses) && httpAddresses[i] == addr {
- supportedHTTP = append(supportedHTTP, addr)
- supported = true
- }
- if i := sort.SearchStrings(grpcAddresses, addr); i < len(grpcAddresses) && grpcAddresses[i] == addr {
- supportedGRPC = append(supportedGRPC, addr)
- supported = true
- }
- if !supported {
- notSupported = append(notSupported, addr)
- }
- }
- return
- }
- type tagsHolder interface {
- AddTag(string, interface{})
- }
- // Add the tags related to security rules monitoring
- func addRulesMonitoringTags(th tagsHolder, rInfo waf.RulesetInfo) {
- if len(rInfo.Errors) == 0 {
- rInfo.Errors = nil
- }
- rulesetErrors, err := json.Marshal(rInfo.Errors)
- if err != nil {
- log.Error("appsec: could not marshal ruleset info errors to json")
- }
- th.AddTag(eventRulesErrorsTag, string(rulesetErrors)) // avoid the tracer's call to fmt.Sprintf on the value
- th.AddTag(eventRulesLoadedTag, float64(rInfo.Loaded))
- th.AddTag(eventRulesFailedTag, float64(rInfo.Failed))
- th.AddTag(wafVersionTag, waf.Version())
- }
- // Add the tags related to the monitoring of the WAF
- func addWAFMonitoringTags(th tagsHolder, rulesVersion string, overallRuntimeNs, internalRuntimeNs, timeouts uint64) {
- // Rules version is set for every request to help the backend associate WAF duration metrics with rule version
- th.AddTag(eventRulesVersionTag, rulesVersion)
- th.AddTag(wafTimeoutTag, float64(timeouts))
- th.AddTag(wafDurationTag, float64(internalRuntimeNs)/1e3) // ns to us
- th.AddTag(wafDurationExtTag, float64(overallRuntimeNs)/1e3) // ns to us
- }
|