remoteconfig.go 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. // Unless explicitly stated otherwise all files in this repository are licensed
  2. // under the Apache License Version 2.0.
  3. // This product includes software developed at Datadog (https://www.datadoghq.com/).
  4. // Copyright 2022 Datadog, Inc.
  5. package remoteconfig
  6. import (
  7. "bytes"
  8. "crypto/rand"
  9. "encoding/hex"
  10. "encoding/json"
  11. "fmt"
  12. "io"
  13. "math/big"
  14. "net/http"
  15. "strings"
  16. "time"
  17. rc "github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
  18. "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
  19. )
  20. // Callback represents a function that can process a remote config update.
  21. // A Callback function can be registered to a remote config client to automatically
  22. // react upon receiving updates. This function returns the configuration processing status
  23. // for each config file received through the update.
  24. type Callback func(u ProductUpdate) map[string]rc.ApplyStatus
  25. // Capability represents a bit index to be set in clientData.Capabilites in order to register a client
  26. // for a specific capability
  27. type Capability uint
  28. const (
  29. _ Capability = iota
  30. // ASMActivation represents the capability to activate ASM through remote configuration
  31. ASMActivation
  32. // ASMIPBlocking represents the capability for ASM to block requests based on user IP
  33. ASMIPBlocking
  34. // ASMDDRules represents the capability to update the rules used by the ASM WAF for threat detection
  35. ASMDDRules
  36. )
  37. // ProductUpdate represents an update for a specific product.
  38. // It is a map of file path to raw file content
  39. type ProductUpdate map[string][]byte
  40. // A Client interacts with an Agent to update and track the state of remote
  41. // configuration
  42. type Client struct {
  43. ClientConfig
  44. clientID string
  45. endpoint string
  46. repository *rc.Repository
  47. stop chan struct{}
  48. callbacks map[string][]Callback
  49. lastError error
  50. }
  51. // NewClient creates a new remoteconfig Client
  52. func NewClient(config ClientConfig) (*Client, error) {
  53. repo, err := rc.NewUnverifiedRepository()
  54. if err != nil {
  55. return nil, err
  56. }
  57. if config.HTTP == nil {
  58. config.HTTP = DefaultClientConfig().HTTP
  59. }
  60. return &Client{
  61. ClientConfig: config,
  62. clientID: generateID(),
  63. endpoint: fmt.Sprintf("%s/v0.7/config", config.AgentURL),
  64. repository: repo,
  65. stop: make(chan struct{}),
  66. lastError: nil,
  67. callbacks: map[string][]Callback{},
  68. }, nil
  69. }
  70. // Start starts the client's update poll loop in a fresh goroutine
  71. func (c *Client) Start() {
  72. go func() {
  73. ticker := time.NewTicker(c.PollInterval)
  74. defer ticker.Stop()
  75. for {
  76. select {
  77. case <-c.stop:
  78. return
  79. case <-ticker.C:
  80. c.updateState()
  81. }
  82. }
  83. }()
  84. }
  85. // Stop stops the client's update poll loop
  86. func (c *Client) Stop() {
  87. close(c.stop)
  88. }
  89. func (c *Client) updateState() {
  90. data, err := c.newUpdateRequest()
  91. if err != nil {
  92. log.Error("remoteconfig: unexpected error while creating a new update request payload: %v", err)
  93. return
  94. }
  95. req, err := http.NewRequest(http.MethodGet, c.endpoint, &data)
  96. if err != nil {
  97. log.Error("remoteconfig: unexpected error while creating a new http request: %v", err)
  98. return
  99. }
  100. resp, err := c.HTTP.Do(req)
  101. if err != nil {
  102. log.Debug("remoteconfig: http request error: %v", err)
  103. return
  104. }
  105. // Flush and close the response body when returning (cf. https://pkg.go.dev/net/http#Client.Do)
  106. defer func() {
  107. io.ReadAll(resp.Body)
  108. resp.Body.Close()
  109. }()
  110. if sc := resp.StatusCode; sc != http.StatusOK {
  111. log.Debug("remoteconfig: http request error: response status code is not 200 (OK) but %s", http.StatusText(sc))
  112. return
  113. }
  114. respBody, err := io.ReadAll(resp.Body)
  115. if err != nil {
  116. log.Error("remoteconfig: http request error: could not read the response body: %v", err)
  117. return
  118. }
  119. if body := string(respBody); body == `{}` || body == `null` {
  120. return
  121. }
  122. var update clientGetConfigsResponse
  123. if err := json.Unmarshal(respBody, &update); err != nil {
  124. log.Error("remoteconfig: http request error: could not parse the json response body: %v", err)
  125. return
  126. }
  127. c.lastError = c.applyUpdate(&update)
  128. }
  129. // RegisterCallback allows registering a callback that will be invoked when the client
  130. // receives a configuration update for the specified product.
  131. func (c *Client) RegisterCallback(f Callback, product string) {
  132. c.callbacks[product] = append(c.callbacks[product], f)
  133. }
  134. func (c *Client) applyUpdate(pbUpdate *clientGetConfigsResponse) error {
  135. fileMap := make(map[string][]byte, len(pbUpdate.TargetFiles))
  136. productUpdates := make(map[string]ProductUpdate, len(c.Products))
  137. for _, f := range pbUpdate.TargetFiles {
  138. fileMap[f.Path] = f.Raw
  139. for _, p := range c.Products {
  140. productUpdates[p] = make(ProductUpdate)
  141. if strings.Contains(f.Path, p) {
  142. productUpdates[p][f.Path] = f.Raw
  143. }
  144. }
  145. }
  146. mapify := func(s *rc.RepositoryState) map[string]string {
  147. m := make(map[string]string)
  148. for i := range s.Configs {
  149. path := s.CachedFiles[i].Path
  150. product := s.Configs[i].Product
  151. m[path] = product
  152. }
  153. return m
  154. }
  155. // Check the repository state before and after the update to detect which configs are not being sent anymore.
  156. // This is needed because some products can stop sending configurations, and we want to make sure that the subscribers
  157. // are provided with this information in this case
  158. stateBefore, err := c.repository.CurrentState()
  159. if err != nil {
  160. return fmt.Errorf("repository current state error: %v", err)
  161. }
  162. products, err := c.repository.Update(rc.Update{
  163. TUFRoots: pbUpdate.Roots,
  164. TUFTargets: pbUpdate.Targets,
  165. TargetFiles: fileMap,
  166. ClientConfigs: pbUpdate.ClientConfigs,
  167. })
  168. if err != nil {
  169. return fmt.Errorf("repository update error: %v", err)
  170. }
  171. stateAfter, err := c.repository.CurrentState()
  172. if err != nil {
  173. return fmt.Errorf("repository current state error after update: %v", err)
  174. }
  175. // Create a config files diff between before/after the update to see which config files are missing
  176. mBefore := mapify(&stateBefore)
  177. for k := range mapify(&stateAfter) {
  178. delete(mBefore, k)
  179. }
  180. // Set the payload data to nil for missing config files. The callbacks then can handle the nil config case to detect
  181. // that this config will not be updated anymore.
  182. updatedProducts := make(map[string]struct{})
  183. for path, product := range mBefore {
  184. if productUpdates[product] == nil {
  185. productUpdates[product] = make(ProductUpdate)
  186. }
  187. productUpdates[product][path] = nil
  188. updatedProducts[product] = struct{}{}
  189. }
  190. // Aggregate updated products and missing products so that callbacks get called for both
  191. for _, p := range products {
  192. updatedProducts[p] = struct{}{}
  193. }
  194. // Performs the callbacks registered for all updated products and update the application status in the repository
  195. // (RCTE2)
  196. for p := range updatedProducts {
  197. for _, fn := range c.callbacks[p] {
  198. for path, status := range fn(productUpdates[p]) {
  199. c.repository.UpdateApplyStatus(path, status)
  200. }
  201. }
  202. }
  203. return nil
  204. }
  205. func (c *Client) newUpdateRequest() (bytes.Buffer, error) {
  206. state, err := c.repository.CurrentState()
  207. if err != nil {
  208. return bytes.Buffer{}, err
  209. }
  210. // Temporary check while using untrusted repo, for which no initial root file is provided
  211. if state.RootsVersion < 1 {
  212. state.RootsVersion = 1
  213. }
  214. pbCachedFiles := make([]*targetFileMeta, 0, len(state.CachedFiles))
  215. for _, f := range state.CachedFiles {
  216. pbHashes := make([]*targetFileHash, 0, len(f.Hashes))
  217. for alg, hash := range f.Hashes {
  218. pbHashes = append(pbHashes, &targetFileHash{
  219. Algorithm: alg,
  220. Hash: hex.EncodeToString(hash),
  221. })
  222. }
  223. pbCachedFiles = append(pbCachedFiles, &targetFileMeta{
  224. Path: f.Path,
  225. Length: int64(f.Length),
  226. Hashes: pbHashes,
  227. })
  228. }
  229. hasError := c.lastError != nil
  230. errMsg := ""
  231. if hasError {
  232. errMsg = c.lastError.Error()
  233. }
  234. var pbConfigState []*configState
  235. if !hasError {
  236. pbConfigState = make([]*configState, 0, len(state.Configs))
  237. for _, f := range state.Configs {
  238. pbConfigState = append(pbConfigState, &configState{
  239. ID: f.ID,
  240. Version: f.Version,
  241. Product: f.Product,
  242. ApplyState: f.ApplyStatus.State,
  243. ApplyError: f.ApplyStatus.Error,
  244. })
  245. }
  246. }
  247. cap := big.NewInt(0)
  248. for _, i := range c.Capabilities {
  249. cap.SetBit(cap, int(i), 1)
  250. }
  251. req := clientGetConfigsRequest{
  252. Client: &clientData{
  253. State: &clientState{
  254. RootVersion: uint64(state.RootsVersion),
  255. TargetsVersion: uint64(state.TargetsVersion),
  256. ConfigStates: pbConfigState,
  257. HasError: hasError,
  258. Error: errMsg,
  259. },
  260. ID: c.clientID,
  261. Products: c.Products,
  262. IsTracer: true,
  263. ClientTracer: &clientTracer{
  264. RuntimeID: c.RuntimeID,
  265. Language: "go",
  266. TracerVersion: c.TracerVersion,
  267. Service: c.ServiceName,
  268. Env: c.Env,
  269. AppVersion: c.AppVersion,
  270. },
  271. Capabilities: cap.Bytes(),
  272. },
  273. CachedTargetFiles: pbCachedFiles,
  274. }
  275. var b bytes.Buffer
  276. err = json.NewEncoder(&b).Encode(&req)
  277. if err != nil {
  278. return bytes.Buffer{}, err
  279. }
  280. return b, nil
  281. }
  282. var (
  283. idSize = 21
  284. idAlphabet = []rune("_-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
  285. )
  286. func generateID() string {
  287. bytes := make([]byte, idSize)
  288. _, err := rand.Read(bytes)
  289. if err != nil {
  290. panic(err)
  291. }
  292. id := make([]rune, idSize)
  293. for i := 0; i < idSize; i++ {
  294. id[i] = idAlphabet[bytes[i]&63]
  295. }
  296. return string(id[:idSize])
  297. }