waf.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. // Unless explicitly stated otherwise all files in this repository are licensed
  2. // under the Apache License Version 2.0.
  3. // This product includes software developed at Datadog (https://www.datadoghq.com/).
  4. // Copyright 2016 Datadog, Inc.
  5. //go:build appsec
  6. // +build appsec
  7. package appsec
  8. import (
  9. "encoding/json"
  10. "errors"
  11. "fmt"
  12. "sort"
  13. "sync"
  14. "sync/atomic"
  15. "time"
  16. "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
  17. "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo"
  18. "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo/instrumentation/grpcsec"
  19. "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/dyngo/instrumentation/httpsec"
  20. "gopkg.in/DataDog/dd-trace-go.v1/internal/appsec/waf"
  21. "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
  22. "gopkg.in/DataDog/dd-trace-go.v1/internal/samplernames"
  23. )
  24. const (
  25. eventRulesVersionTag = "_dd.appsec.event_rules.version"
  26. eventRulesErrorsTag = "_dd.appsec.event_rules.errors"
  27. eventRulesLoadedTag = "_dd.appsec.event_rules.loaded"
  28. eventRulesFailedTag = "_dd.appsec.event_rules.error_count"
  29. wafDurationTag = "_dd.appsec.waf.duration"
  30. wafDurationExtTag = "_dd.appsec.waf.duration_ext"
  31. wafTimeoutTag = "_dd.appsec.waf.timeouts"
  32. wafVersionTag = "_dd.appsec.waf.version"
  33. )
  34. // Register the WAF event listener.
  35. func (a *appsec) registerWAF() (unreg dyngo.UnregisterFunc, err error) {
  36. // Check the WAF is healthy
  37. if err := waf.Health(); err != nil {
  38. return nil, err
  39. }
  40. // Instantiate the WAF
  41. waf, err := waf.NewHandle(a.cfg.rules, a.cfg.obfuscator.KeyRegex, a.cfg.obfuscator.ValueRegex)
  42. if err != nil {
  43. return nil, err
  44. }
  45. // Close the WAF in case of an error in what's following
  46. defer func() {
  47. if err != nil {
  48. waf.Close()
  49. }
  50. }()
  51. // Check if there are addresses in the rule
  52. ruleAddresses := waf.Addresses()
  53. if len(ruleAddresses) == 0 {
  54. return nil, errors.New("no addresses found in the rule")
  55. }
  56. // Check there are supported addresses in the rule
  57. httpAddresses, grpcAddresses, notSupported := supportedAddresses(ruleAddresses)
  58. if len(httpAddresses) == 0 && len(grpcAddresses) == 0 {
  59. return nil, fmt.Errorf("the addresses present in the rule are not supported: %v", notSupported)
  60. } else if len(notSupported) > 0 {
  61. log.Debug("appsec: the addresses present in the rule are partially supported: not supported=%v", notSupported)
  62. }
  63. // Register the WAF event listener
  64. var unregisterHTTP, unregisterGRPC dyngo.UnregisterFunc
  65. if len(httpAddresses) > 0 {
  66. log.Debug("appsec: registering http waf listening to addresses %v", httpAddresses)
  67. unregisterHTTP = dyngo.Register(newHTTPWAFEventListener(waf, httpAddresses, a.cfg.wafTimeout, a.limiter))
  68. }
  69. if len(grpcAddresses) > 0 {
  70. log.Debug("appsec: registering grpc waf listening to addresses %v", grpcAddresses)
  71. unregisterGRPC = dyngo.Register(newGRPCWAFEventListener(waf, grpcAddresses, a.cfg.wafTimeout, a.limiter))
  72. }
  73. if err := a.enableRCBlocking(wafHandleWrapper{waf}); err != nil {
  74. log.Error("appsec: Remote config: cannot enable blocking, rules data won't be updated: %v", err)
  75. }
  76. // Return an unregistration function that will also release the WAF instance.
  77. return func() {
  78. defer waf.Close()
  79. if unregisterHTTP != nil {
  80. unregisterHTTP()
  81. }
  82. if unregisterGRPC != nil {
  83. unregisterGRPC()
  84. }
  85. }, nil
  86. }
  87. // newWAFEventListener returns the WAF event listener to register in order to enable it.
  88. func newHTTPWAFEventListener(handle *waf.Handle, addresses []string, timeout time.Duration, limiter Limiter) dyngo.EventListener {
  89. var monitorRulesOnce sync.Once // per instantiation
  90. actionHandler := httpsec.NewActionsHandler()
  91. return httpsec.OnHandlerOperationStart(func(op *httpsec.Operation, args httpsec.HandlerOperationArgs) {
  92. var body interface{}
  93. wafCtx := waf.NewContext(handle)
  94. if wafCtx == nil {
  95. // The WAF event listener got concurrently released
  96. return
  97. }
  98. values := map[string]interface{}{}
  99. for _, addr := range addresses {
  100. if addr == httpClientIPAddr && args.ClientIP.IsValid() {
  101. values[httpClientIPAddr] = args.ClientIP.String()
  102. }
  103. }
  104. // TODO: suspicious request blocking by moving here all the addresses available when the request begins
  105. matches, actionIds := runWAF(wafCtx, values, timeout)
  106. if len(matches) > 0 {
  107. interrupt := false
  108. for _, id := range actionIds {
  109. interrupt = actionHandler.Apply(id, op) || interrupt
  110. }
  111. op.AddSecurityEvents(matches)
  112. log.Debug("appsec: WAF detected an attack before executing the request")
  113. if interrupt {
  114. wafCtx.Close()
  115. return
  116. }
  117. }
  118. op.On(httpsec.OnSDKBodyOperationStart(func(op *httpsec.SDKBodyOperation, args httpsec.SDKBodyOperationArgs) {
  119. body = args.Body
  120. }))
  121. // At the moment, AppSec doesn't block the requests, and so we can use the fact we are in monitoring-only mode
  122. // to call the WAF only once at the end of the handler operation.
  123. op.On(httpsec.OnHandlerOperationFinish(func(op *httpsec.Operation, res httpsec.HandlerOperationRes) {
  124. defer wafCtx.Close()
  125. // Run the WAF on the rule addresses available in the request args
  126. values := make(map[string]interface{}, len(addresses))
  127. for _, addr := range addresses {
  128. switch addr {
  129. case serverRequestRawURIAddr:
  130. values[serverRequestRawURIAddr] = args.RequestURI
  131. case serverRequestHeadersNoCookiesAddr:
  132. if headers := args.Headers; headers != nil {
  133. values[serverRequestHeadersNoCookiesAddr] = headers
  134. }
  135. case serverRequestCookiesAddr:
  136. if cookies := args.Cookies; cookies != nil {
  137. values[serverRequestCookiesAddr] = cookies
  138. }
  139. case serverRequestQueryAddr:
  140. if query := args.Query; query != nil {
  141. values[serverRequestQueryAddr] = query
  142. }
  143. case serverRequestPathParamsAddr:
  144. if pathParams := args.PathParams; pathParams != nil {
  145. values[serverRequestPathParamsAddr] = pathParams
  146. }
  147. case serverRequestBodyAddr:
  148. if body != nil {
  149. values[serverRequestBodyAddr] = body
  150. }
  151. case serverResponseStatusAddr:
  152. values[serverResponseStatusAddr] = res.Status
  153. }
  154. }
  155. // Run the WAF, ignoring the returned actions - if any - since blocking after the request handler's
  156. // response is not supported at the moment.
  157. matches, _ := runWAF(wafCtx, values, timeout)
  158. // Add WAF metrics.
  159. rInfo := handle.RulesetInfo()
  160. overallRuntimeNs, internalRuntimeNs := wafCtx.TotalRuntime()
  161. addWAFMonitoringTags(op, rInfo.Version, overallRuntimeNs, internalRuntimeNs, wafCtx.TotalTimeouts())
  162. // Add the following metrics once per instantiation of a WAF handle
  163. monitorRulesOnce.Do(func() {
  164. addRulesMonitoringTags(op, rInfo)
  165. op.AddTag(ext.ManualKeep, samplernames.AppSec)
  166. })
  167. // Log the attacks if any
  168. if len(matches) == 0 {
  169. return
  170. }
  171. log.Debug("appsec: attack detected by the waf")
  172. if limiter.Allow() {
  173. op.AddSecurityEvents(matches)
  174. }
  175. }))
  176. })
  177. }
  178. // newGRPCWAFEventListener returns the WAF event listener to register in order
  179. // to enable it.
  180. func newGRPCWAFEventListener(handle *waf.Handle, addresses []string, timeout time.Duration, limiter Limiter) dyngo.EventListener {
  181. var monitorRulesOnce sync.Once // per instantiation
  182. actionHandler := grpcsec.NewActionsHandler()
  183. return grpcsec.OnHandlerOperationStart(func(op *grpcsec.HandlerOperation, handlerArgs grpcsec.HandlerOperationArgs) {
  184. // Limit the maximum number of security events, as a streaming RPC could
  185. // receive unlimited number of messages where we could find security events
  186. const maxWAFEventsPerRequest = 10
  187. var (
  188. nbEvents uint32
  189. logOnce sync.Once // per request
  190. overallRuntimeNs waf.AtomicU64
  191. internalRuntimeNs waf.AtomicU64
  192. nbTimeouts waf.AtomicU64
  193. events []json.RawMessage
  194. mu sync.Mutex // events mutex
  195. )
  196. wafCtx := waf.NewContext(handle)
  197. if wafCtx == nil {
  198. // The WAF event listener got concurrently released
  199. return
  200. }
  201. defer wafCtx.Close()
  202. // The same address is used for gRPC and http when it comes to client ip
  203. values := map[string]interface{}{}
  204. for _, addr := range addresses {
  205. if addr == httpClientIPAddr && handlerArgs.ClientIP.IsValid() {
  206. values[httpClientIPAddr] = handlerArgs.ClientIP.String()
  207. }
  208. }
  209. matches, actionIds := runWAF(wafCtx, values, timeout)
  210. if len(matches) > 0 {
  211. interrupt := false
  212. for _, id := range actionIds {
  213. interrupt = actionHandler.Apply(id, op) || interrupt
  214. }
  215. op.AddSecurityEvents(matches)
  216. log.Debug("appsec: WAF detected an attack before executing the request")
  217. if interrupt {
  218. return
  219. }
  220. }
  221. op.On(grpcsec.OnReceiveOperationFinish(func(_ grpcsec.ReceiveOperation, res grpcsec.ReceiveOperationRes) {
  222. if atomic.LoadUint32(&nbEvents) == maxWAFEventsPerRequest {
  223. logOnce.Do(func() {
  224. log.Debug("appsec: ignoring the rpc message due to the maximum number of security events per grpc call reached")
  225. })
  226. return
  227. }
  228. // The current workaround of the WAF context limitations is to
  229. // simply instantiate and release the WAF context for the operation
  230. // lifetime so that:
  231. // 1. We avoid growing the memory usage of the context every time
  232. // a grpc.server.request.message value is added to it during
  233. // the RPC lifetime.
  234. // 2. We avoid the limitation of 1 event per attack type.
  235. // TODO(Julio-Guerra): a future libddwaf API should solve this out.
  236. wafCtx := waf.NewContext(handle)
  237. if wafCtx == nil {
  238. // The WAF event listener got concurrently released
  239. return
  240. }
  241. defer wafCtx.Close()
  242. // Run the WAF on the rule addresses available in the args
  243. // Note that we don't check if the address is present in the rules
  244. // as we only support one at the moment, so this callback cannot be
  245. // set when the address is not present.
  246. values := map[string]interface{}{grpcServerRequestMessage: res.Message}
  247. if md := handlerArgs.Metadata; len(md) > 0 {
  248. values[grpcServerRequestMetadata] = md
  249. }
  250. // Run the WAF, ignoring the returned actions - if any - since blocking after the request handler's
  251. // response is not supported at the moment.
  252. event, _ := runWAF(wafCtx, values, timeout)
  253. // WAF run durations are WAF context bound. As of now we need to keep track of those externally since
  254. // we use a new WAF context for each callback. When we are able to re-use the same WAF context across
  255. // callbacks, we can get rid of these variables and simply use the WAF bindings in OnHandlerOperationFinish.
  256. overall, internal := wafCtx.TotalRuntime()
  257. overallRuntimeNs.Add(overall)
  258. internalRuntimeNs.Add(internal)
  259. nbTimeouts.Add(wafCtx.TotalTimeouts())
  260. if len(event) == 0 {
  261. return
  262. }
  263. log.Debug("appsec: attack detected by the grpc waf")
  264. atomic.AddUint32(&nbEvents, 1)
  265. mu.Lock()
  266. events = append(events, event)
  267. mu.Unlock()
  268. }))
  269. op.On(grpcsec.OnHandlerOperationFinish(func(op *grpcsec.HandlerOperation, _ grpcsec.HandlerOperationRes) {
  270. rInfo := handle.RulesetInfo()
  271. addWAFMonitoringTags(op, rInfo.Version, overallRuntimeNs.Load(), internalRuntimeNs.Load(), nbTimeouts.Load())
  272. // Log the following metrics once per instantiation of a WAF handle
  273. monitorRulesOnce.Do(func() {
  274. addRulesMonitoringTags(op, rInfo)
  275. op.AddTag(ext.ManualKeep, samplernames.AppSec)
  276. })
  277. // Log the events if any
  278. if len(events) > 0 && limiter.Allow() {
  279. op.AddSecurityEvents(events...)
  280. }
  281. }))
  282. })
  283. }
  284. func runWAF(wafCtx *waf.Context, values map[string]interface{}, timeout time.Duration) ([]byte, []string) {
  285. matches, actions, err := wafCtx.Run(values, timeout)
  286. if err != nil {
  287. if err == waf.ErrTimeout {
  288. log.Debug("appsec: waf timeout value of %s reached", timeout)
  289. } else {
  290. log.Error("appsec: unexpected waf error: %v", err)
  291. return nil, nil
  292. }
  293. }
  294. return matches, actions
  295. }
  296. // HTTP rule addresses currently supported by the WAF
  297. const (
  298. serverRequestRawURIAddr = "server.request.uri.raw"
  299. serverRequestHeadersNoCookiesAddr = "server.request.headers.no_cookies"
  300. serverRequestCookiesAddr = "server.request.cookies"
  301. serverRequestQueryAddr = "server.request.query"
  302. serverRequestPathParamsAddr = "server.request.path_params"
  303. serverRequestBodyAddr = "server.request.body"
  304. serverResponseStatusAddr = "server.response.status"
  305. httpClientIPAddr = "http.client_ip"
  306. )
  307. // List of HTTP rule addresses currently supported by the WAF
  308. var httpAddresses = []string{
  309. serverRequestRawURIAddr,
  310. serverRequestHeadersNoCookiesAddr,
  311. serverRequestCookiesAddr,
  312. serverRequestQueryAddr,
  313. serverRequestPathParamsAddr,
  314. serverRequestBodyAddr,
  315. serverResponseStatusAddr,
  316. httpClientIPAddr,
  317. }
  318. // gRPC rule addresses currently supported by the WAF
  319. const (
  320. grpcServerRequestMessage = "grpc.server.request.message"
  321. grpcServerRequestMetadata = "grpc.server.request.metadata"
  322. )
  323. // List of gRPC rule addresses currently supported by the WAF
  324. var grpcAddresses = []string{
  325. grpcServerRequestMessage,
  326. grpcServerRequestMetadata,
  327. httpClientIPAddr,
  328. }
  329. func init() {
  330. // sort the address lists to avoid mistakes and use sort.SearchStrings()
  331. sort.Strings(httpAddresses)
  332. sort.Strings(grpcAddresses)
  333. }
  334. // supportedAddresses returns the list of addresses we actually support from the
  335. // given rule addresses.
  336. func supportedAddresses(ruleAddresses []string) (supportedHTTP, supportedGRPC, notSupported []string) {
  337. // Filter the supported addresses only
  338. for _, addr := range ruleAddresses {
  339. supported := false
  340. if i := sort.SearchStrings(httpAddresses, addr); i < len(httpAddresses) && httpAddresses[i] == addr {
  341. supportedHTTP = append(supportedHTTP, addr)
  342. supported = true
  343. }
  344. if i := sort.SearchStrings(grpcAddresses, addr); i < len(grpcAddresses) && grpcAddresses[i] == addr {
  345. supportedGRPC = append(supportedGRPC, addr)
  346. supported = true
  347. }
  348. if !supported {
  349. notSupported = append(notSupported, addr)
  350. }
  351. }
  352. return
  353. }
  354. type tagsHolder interface {
  355. AddTag(string, interface{})
  356. }
  357. // Add the tags related to security rules monitoring
  358. func addRulesMonitoringTags(th tagsHolder, rInfo waf.RulesetInfo) {
  359. if len(rInfo.Errors) == 0 {
  360. rInfo.Errors = nil
  361. }
  362. rulesetErrors, err := json.Marshal(rInfo.Errors)
  363. if err != nil {
  364. log.Error("appsec: could not marshal ruleset info errors to json")
  365. }
  366. th.AddTag(eventRulesErrorsTag, string(rulesetErrors)) // avoid the tracer's call to fmt.Sprintf on the value
  367. th.AddTag(eventRulesLoadedTag, float64(rInfo.Loaded))
  368. th.AddTag(eventRulesFailedTag, float64(rInfo.Failed))
  369. th.AddTag(wafVersionTag, waf.Version())
  370. }
  371. // Add the tags related to the monitoring of the WAF
  372. func addWAFMonitoringTags(th tagsHolder, rulesVersion string, overallRuntimeNs, internalRuntimeNs, timeouts uint64) {
  373. // Rules version is set for every request to help the backend associate WAF duration metrics with rule version
  374. th.AddTag(eventRulesVersionTag, rulesVersion)
  375. th.AddTag(wafTimeoutTag, float64(timeouts))
  376. th.AddTag(wafDurationTag, float64(internalRuntimeNs)/1e3) // ns to us
  377. th.AddTag(wafDurationExtTag, float64(overallRuntimeNs)/1e3) // ns to us
  378. }