| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- // Unless explicitly stated otherwise all files in this repository are licensed
- // under the Apache License Version 2.0.
- // This product includes software developed at Datadog (https://www.datadoghq.com/).
- // Copyright 2022 Datadog, Inc.
- package remoteconfig
- import (
- "bytes"
- "crypto/rand"
- "encoding/hex"
- "encoding/json"
- "fmt"
- "io"
- "math/big"
- "net/http"
- "strings"
- "time"
- rc "github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
- "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
- )
- // Callback represents a function that can process a remote config update.
- // A Callback function can be registered to a remote config client to automatically
- // react upon receiving updates. This function returns the configuration processing status
- // for each config file received through the update.
- type Callback func(u ProductUpdate) map[string]rc.ApplyStatus
- // Capability represents a bit index to be set in clientData.Capabilites in order to register a client
- // for a specific capability
- type Capability uint
- const (
- _ Capability = iota
- // ASMActivation represents the capability to activate ASM through remote configuration
- ASMActivation
- // ASMIPBlocking represents the capability for ASM to block requests based on user IP
- ASMIPBlocking
- // ASMDDRules represents the capability to update the rules used by the ASM WAF for threat detection
- ASMDDRules
- )
- // ProductUpdate represents an update for a specific product.
- // It is a map of file path to raw file content
- type ProductUpdate map[string][]byte
- // A Client interacts with an Agent to update and track the state of remote
- // configuration
- type Client struct {
- ClientConfig
- clientID string
- endpoint string
- repository *rc.Repository
- stop chan struct{}
- callbacks map[string][]Callback
- lastError error
- }
- // NewClient creates a new remoteconfig Client
- func NewClient(config ClientConfig) (*Client, error) {
- repo, err := rc.NewUnverifiedRepository()
- if err != nil {
- return nil, err
- }
- if config.HTTP == nil {
- config.HTTP = DefaultClientConfig().HTTP
- }
- return &Client{
- ClientConfig: config,
- clientID: generateID(),
- endpoint: fmt.Sprintf("%s/v0.7/config", config.AgentURL),
- repository: repo,
- stop: make(chan struct{}),
- lastError: nil,
- callbacks: map[string][]Callback{},
- }, nil
- }
- // Start starts the client's update poll loop in a fresh goroutine
- func (c *Client) Start() {
- go func() {
- ticker := time.NewTicker(c.PollInterval)
- defer ticker.Stop()
- for {
- select {
- case <-c.stop:
- return
- case <-ticker.C:
- c.updateState()
- }
- }
- }()
- }
- // Stop stops the client's update poll loop
- func (c *Client) Stop() {
- close(c.stop)
- }
- func (c *Client) updateState() {
- data, err := c.newUpdateRequest()
- if err != nil {
- log.Error("remoteconfig: unexpected error while creating a new update request payload: %v", err)
- return
- }
- req, err := http.NewRequest(http.MethodGet, c.endpoint, &data)
- if err != nil {
- log.Error("remoteconfig: unexpected error while creating a new http request: %v", err)
- return
- }
- resp, err := c.HTTP.Do(req)
- if err != nil {
- log.Debug("remoteconfig: http request error: %v", err)
- return
- }
- // Flush and close the response body when returning (cf. https://pkg.go.dev/net/http#Client.Do)
- defer func() {
- io.ReadAll(resp.Body)
- resp.Body.Close()
- }()
- if sc := resp.StatusCode; sc != http.StatusOK {
- log.Debug("remoteconfig: http request error: response status code is not 200 (OK) but %s", http.StatusText(sc))
- return
- }
- respBody, err := io.ReadAll(resp.Body)
- if err != nil {
- log.Error("remoteconfig: http request error: could not read the response body: %v", err)
- return
- }
- if body := string(respBody); body == `{}` || body == `null` {
- return
- }
- var update clientGetConfigsResponse
- if err := json.Unmarshal(respBody, &update); err != nil {
- log.Error("remoteconfig: http request error: could not parse the json response body: %v", err)
- return
- }
- c.lastError = c.applyUpdate(&update)
- }
- // RegisterCallback allows registering a callback that will be invoked when the client
- // receives a configuration update for the specified product.
- func (c *Client) RegisterCallback(f Callback, product string) {
- c.callbacks[product] = append(c.callbacks[product], f)
- }
- func (c *Client) applyUpdate(pbUpdate *clientGetConfigsResponse) error {
- fileMap := make(map[string][]byte, len(pbUpdate.TargetFiles))
- productUpdates := make(map[string]ProductUpdate, len(c.Products))
- for _, f := range pbUpdate.TargetFiles {
- fileMap[f.Path] = f.Raw
- for _, p := range c.Products {
- productUpdates[p] = make(ProductUpdate)
- if strings.Contains(f.Path, p) {
- productUpdates[p][f.Path] = f.Raw
- }
- }
- }
- mapify := func(s *rc.RepositoryState) map[string]string {
- m := make(map[string]string)
- for i := range s.Configs {
- path := s.CachedFiles[i].Path
- product := s.Configs[i].Product
- m[path] = product
- }
- return m
- }
- // Check the repository state before and after the update to detect which configs are not being sent anymore.
- // This is needed because some products can stop sending configurations, and we want to make sure that the subscribers
- // are provided with this information in this case
- stateBefore, err := c.repository.CurrentState()
- if err != nil {
- return fmt.Errorf("repository current state error: %v", err)
- }
- products, err := c.repository.Update(rc.Update{
- TUFRoots: pbUpdate.Roots,
- TUFTargets: pbUpdate.Targets,
- TargetFiles: fileMap,
- ClientConfigs: pbUpdate.ClientConfigs,
- })
- if err != nil {
- return fmt.Errorf("repository update error: %v", err)
- }
- stateAfter, err := c.repository.CurrentState()
- if err != nil {
- return fmt.Errorf("repository current state error after update: %v", err)
- }
- // Create a config files diff between before/after the update to see which config files are missing
- mBefore := mapify(&stateBefore)
- for k := range mapify(&stateAfter) {
- delete(mBefore, k)
- }
- // Set the payload data to nil for missing config files. The callbacks then can handle the nil config case to detect
- // that this config will not be updated anymore.
- updatedProducts := make(map[string]struct{})
- for path, product := range mBefore {
- if productUpdates[product] == nil {
- productUpdates[product] = make(ProductUpdate)
- }
- productUpdates[product][path] = nil
- updatedProducts[product] = struct{}{}
- }
- // Aggregate updated products and missing products so that callbacks get called for both
- for _, p := range products {
- updatedProducts[p] = struct{}{}
- }
- // Performs the callbacks registered for all updated products and update the application status in the repository
- // (RCTE2)
- for p := range updatedProducts {
- for _, fn := range c.callbacks[p] {
- for path, status := range fn(productUpdates[p]) {
- c.repository.UpdateApplyStatus(path, status)
- }
- }
- }
- return nil
- }
- func (c *Client) newUpdateRequest() (bytes.Buffer, error) {
- state, err := c.repository.CurrentState()
- if err != nil {
- return bytes.Buffer{}, err
- }
- // Temporary check while using untrusted repo, for which no initial root file is provided
- if state.RootsVersion < 1 {
- state.RootsVersion = 1
- }
- pbCachedFiles := make([]*targetFileMeta, 0, len(state.CachedFiles))
- for _, f := range state.CachedFiles {
- pbHashes := make([]*targetFileHash, 0, len(f.Hashes))
- for alg, hash := range f.Hashes {
- pbHashes = append(pbHashes, &targetFileHash{
- Algorithm: alg,
- Hash: hex.EncodeToString(hash),
- })
- }
- pbCachedFiles = append(pbCachedFiles, &targetFileMeta{
- Path: f.Path,
- Length: int64(f.Length),
- Hashes: pbHashes,
- })
- }
- hasError := c.lastError != nil
- errMsg := ""
- if hasError {
- errMsg = c.lastError.Error()
- }
- var pbConfigState []*configState
- if !hasError {
- pbConfigState = make([]*configState, 0, len(state.Configs))
- for _, f := range state.Configs {
- pbConfigState = append(pbConfigState, &configState{
- ID: f.ID,
- Version: f.Version,
- Product: f.Product,
- ApplyState: f.ApplyStatus.State,
- ApplyError: f.ApplyStatus.Error,
- })
- }
- }
- cap := big.NewInt(0)
- for _, i := range c.Capabilities {
- cap.SetBit(cap, int(i), 1)
- }
- req := clientGetConfigsRequest{
- Client: &clientData{
- State: &clientState{
- RootVersion: uint64(state.RootsVersion),
- TargetsVersion: uint64(state.TargetsVersion),
- ConfigStates: pbConfigState,
- HasError: hasError,
- Error: errMsg,
- },
- ID: c.clientID,
- Products: c.Products,
- IsTracer: true,
- ClientTracer: &clientTracer{
- RuntimeID: c.RuntimeID,
- Language: "go",
- TracerVersion: c.TracerVersion,
- Service: c.ServiceName,
- Env: c.Env,
- AppVersion: c.AppVersion,
- },
- Capabilities: cap.Bytes(),
- },
- CachedTargetFiles: pbCachedFiles,
- }
- var b bytes.Buffer
- err = json.NewEncoder(&b).Encode(&req)
- if err != nil {
- return bytes.Buffer{}, err
- }
- return b, nil
- }
- var (
- idSize = 21
- idAlphabet = []rune("_-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
- )
- func generateID() string {
- bytes := make([]byte, idSize)
- _, err := rand.Read(bytes)
- if err != nil {
- panic(err)
- }
- id := make([]rune, idSize)
- for i := 0; i < idSize; i++ {
- id[i] = idAlphabet[bytes[i]&63]
- }
- return string(id[:idSize])
- }
|