cache.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package runtime
  15. import (
  16. "sync"
  17. "time"
  18. )
  19. // Cache stores the PodStatus for the pods. It represents *all* the visible
  20. // pods/containers in the container runtime. All cache entries are at least as
  21. // new or newer than the global timestamp (set by UpdateTime()), while
  22. // individual entries may be slightly newer than the global timestamp. If a pod
  23. // has no states known by the runtime, Cache returns an empty PodStatus object
  24. // with ID populated.
  25. //
  26. // Cache provides two methods to retrieve the PodStatus: the non-blocking Get()
  27. // and the blocking GetNewerThan() method. The component responsible for
  28. // populating the cache is expected to call Delete() to explicitly free the
  29. // cache entries.
  30. type Cache interface {
  31. Get(string) (*PodStatus, error)
  32. Set(string, *PodStatus, error, time.Time)
  33. // GetNewerThan is a blocking call that only returns the status
  34. // when it is newer than the given time.
  35. GetNewerThan(string, time.Time) (*PodStatus, error)
  36. Delete(string)
  37. UpdateTime(time.Time)
  38. }
  39. type data struct {
  40. // Status of the pod.
  41. status *PodStatus
  42. // Error got when trying to inspect the pod.
  43. err error
  44. // Time when the data was last modified.
  45. modified time.Time
  46. }
  47. type subRecord struct {
  48. time time.Time
  49. ch chan *data
  50. }
  51. // cache implements Cache.
  52. type cache struct {
  53. // Lock which guards all internal data structures.
  54. lock sync.RWMutex
  55. // Map that stores the pod statuses.
  56. pods map[string]*data
  57. // A global timestamp represents how fresh the cached data is. All
  58. // cache content is at the least newer than this timestamp. Note that the
  59. // timestamp is nil after initialization, and will only become non-nil when
  60. // it is ready to serve the cached statuses.
  61. timestamp *time.Time
  62. // Map that stores the subscriber records.
  63. subscribers map[string][]*subRecord
  64. }
  65. // NewCache creates a pod cache.
  66. func NewCache() Cache {
  67. return &cache{
  68. pods: map[string]*data{},
  69. subscribers: map[string][]*subRecord{},
  70. }
  71. }
  72. // Get returns the PodStatus for the pod; callers are expected not to
  73. // modify the objects returned.
  74. func (c *cache) Get(id string) (*PodStatus, error) {
  75. c.lock.RLock()
  76. defer c.lock.RUnlock()
  77. d := c.get(id)
  78. return d.status, d.err
  79. }
  80. func (c *cache) GetNewerThan(id string, minTime time.Time) (*PodStatus, error) {
  81. ch := c.subscribe(id, minTime)
  82. d := <-ch
  83. return d.status, d.err
  84. }
  85. // Set sets the PodStatus for the pod.
  86. func (c *cache) Set(id string, status *PodStatus, err error, timestamp time.Time) {
  87. c.lock.Lock()
  88. defer c.lock.Unlock()
  89. defer c.notify(id, timestamp)
  90. c.pods[id] = &data{status: status, err: err, modified: timestamp}
  91. }
  92. // Delete removes the entry of the pod.
  93. func (c *cache) Delete(id string) {
  94. c.lock.Lock()
  95. defer c.lock.Unlock()
  96. delete(c.pods, id)
  97. }
  98. // UpdateTime modifies the global timestamp of the cache and notify
  99. // subscribers if needed.
  100. func (c *cache) UpdateTime(timestamp time.Time) {
  101. c.lock.Lock()
  102. defer c.lock.Unlock()
  103. c.timestamp = &timestamp
  104. // Notify all the subscribers if the condition is met.
  105. for id := range c.subscribers {
  106. c.notify(id, *c.timestamp)
  107. }
  108. }
  109. func makeDefaultData(id string) *data {
  110. return &data{status: &PodStatus{ID: id}, err: nil}
  111. }
  112. func (c *cache) get(id string) *data {
  113. d, ok := c.pods[id]
  114. if !ok {
  115. // Cache should store *all* pod/container information known by the
  116. // container runtime. A cache miss indicates that there are no states
  117. // regarding the pod last time we queried the container runtime.
  118. // What this *really* means is that there are no visible pod/containers
  119. // associated with this pod. Simply return an default (mostly empty)
  120. // PodStatus to reflect this.
  121. return makeDefaultData(id)
  122. }
  123. return d
  124. }
  125. // getIfNewerThan returns the data it is newer than the given time.
  126. // Otherwise, it returns nil. The caller should acquire the lock.
  127. func (c *cache) getIfNewerThan(id string, minTime time.Time) *data {
  128. d, ok := c.pods[id]
  129. globalTimestampIsNewer := (c.timestamp != nil && c.timestamp.After(minTime))
  130. if !ok && globalTimestampIsNewer {
  131. // Status is not cached, but the global timestamp is newer than
  132. // minTime, return the default status.
  133. return makeDefaultData(id)
  134. }
  135. if ok && (d.modified.After(minTime) || globalTimestampIsNewer) {
  136. // Status is cached, return status if either of the following is true.
  137. // * status was modified after minTime
  138. // * the global timestamp of the cache is newer than minTime.
  139. return d
  140. }
  141. // The pod status is not ready.
  142. return nil
  143. }
  144. // notify sends notifications for pod with the given id, if the requirements
  145. // are met. Note that the caller should acquire the lock.
  146. func (c *cache) notify(id string, timestamp time.Time) {
  147. list, ok := c.subscribers[id]
  148. if !ok {
  149. // No one to notify.
  150. return
  151. }
  152. newList := []*subRecord{}
  153. for i, r := range list {
  154. if timestamp.Before(r.time) {
  155. // Doesn't meet the time requirement; keep the record.
  156. newList = append(newList, list[i])
  157. continue
  158. }
  159. r.ch <- c.get(id)
  160. close(r.ch)
  161. }
  162. if len(newList) == 0 {
  163. delete(c.subscribers, id)
  164. } else {
  165. c.subscribers[id] = newList
  166. }
  167. }
  168. func (c *cache) subscribe(id string, timestamp time.Time) chan *data {
  169. ch := make(chan *data, 1)
  170. c.lock.Lock()
  171. defer c.lock.Unlock()
  172. d := c.getIfNewerThan(id, timestamp)
  173. if d != nil {
  174. // If the cache entry is ready, send the data and return immediately.
  175. ch <- d
  176. return ch
  177. }
  178. // Add the subscription record.
  179. c.subscribers[id] = append(c.subscribers[id], &subRecord{time: timestamp, ch: ch})
  180. return ch
  181. }