operation.go 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. // Unless explicitly stated otherwise all files in this repository are licensed
  2. // under the Apache License Version 2.0.
  3. // This product includes software developed at Datadog (https://www.datadoghq.com/).
  4. // Copyright 2016 Datadog, Inc.
  5. // Package dyngo is the Go implementation of Datadog's Instrumentation Gateway
  6. // which provides an event-based instrumentation API based on a stack
  7. // representation of instrumented functions along with nested event listeners.
  8. // It allows to both correlate passed and future function calls in order to
  9. // react and monitor specific function call scenarios, while keeping the
  10. // monitoring state local to the monitoring logic thanks to nested Go function
  11. // closures.
  12. // dyngo is not intended to be directly used and should be instead wrapped
  13. // behind statically and strongly typed wrapper types. Indeed, dyngo is a
  14. // generic implementation relying on empty interface values (values of type
  15. // `interface{}`) and using it directly can be error-prone due to the lack of
  16. // compile-time type-checking. For example, AppSec provides the package
  17. // `httpsec`, built on top of dyngo, as its HTTP instrumentation API and which
  18. // defines the abstract HTTP operation representation expected by the AppSec
  19. // monitoring.
  20. package dyngo
  21. import (
  22. "reflect"
  23. "sort"
  24. "sync"
  25. "sync/atomic"
  26. "gopkg.in/DataDog/dd-trace-go.v1/internal/log"
  27. )
  28. // Operation interface type allowing to register event listeners to the
  29. // operation. The event listeners will be automatically removed from the
  30. // operation once it finishes so that it no longer can be called on finished
  31. // operations.
  32. type Operation interface {
  33. // On allows to register an event listener to the operation. The event
  34. // listener will be removed from the operation once it finishes.
  35. On(EventListener)
  36. // Parent return the parent operation. It returns nil for the root
  37. // operation.
  38. Parent() Operation
  39. // emitEvent emits the event to listeners of the given argsType and calls
  40. // them with the given op and v values.
  41. // emitEvent is a private method implemented by the operation struct type so
  42. // that no other package can define it.
  43. emitEvent(argsType reflect.Type, op Operation, v interface{})
  44. // register the given event listeners and return the unregistration
  45. // function allowing to remove the event listener from the operation.
  46. // register is a private method implemented by the operation struct type so
  47. // that no other package can define it.
  48. register(...EventListener) UnregisterFunc
  49. // finish the operation. This method allows to pass the operation value to
  50. // use to emit the finish event.
  51. // finish is a private method implemented by the operation struct type so
  52. // that no other package can define it.
  53. finish(op Operation, results interface{})
  54. }
  55. // EventListener interface allowing to identify the Go type listened to and
  56. // dispatch calls to the underlying event listener function.
  57. type EventListener interface {
  58. // ListenedType returns the Go type the event listener listens to.
  59. ListenedType() reflect.Type
  60. // Call the underlying event listener function. The type of the value v
  61. // is the type the event listener listens to, according to the type
  62. // returned by ListenedType().
  63. Call(op Operation, v interface{})
  64. }
  65. // UnregisterFunc is a function allowing to unregister from an operation the
  66. // previously registered event listeners.
  67. type UnregisterFunc func()
  68. var rootOperation = newOperation(nil)
  69. // Register global operation event listeners to listen to.
  70. func Register(listeners ...EventListener) UnregisterFunc {
  71. return rootOperation.register(listeners...)
  72. }
  73. // operation structure allowing to subscribe to operation events and to
  74. // navigate in the operation stack. Events
  75. // bubble-up the operation stack, which allows listening to future events that
  76. // might happen in the operation lifetime.
  77. type operation struct {
  78. parent Operation
  79. eventRegister
  80. disabled bool
  81. mu sync.RWMutex
  82. }
  83. // NewOperation creates and returns a new operationIt must be started by calling
  84. // StartOperation, and finished by calling FinishOperation. The returned
  85. // operation should be used in wrapper types to provide statically typed start
  86. // and finish functions. The following example shows how to wrap an operation
  87. // so that its functions are statically typed (instead of dyngo's interface{}
  88. // values):
  89. // package mypackage
  90. // import "dyngo"
  91. // type (
  92. // MyOperation struct {
  93. // dyngo.Operation
  94. // }
  95. // MyOperationArgs { /* ... */ }
  96. // MyOperationRes { /* ... */ }
  97. // )
  98. // func StartOperation(args MyOperationArgs, parent dyngo.Operation) MyOperation {
  99. // op := MyOperation{Operation: dyngo.NewOperation(parent)}
  100. // dyngo.StartOperation(op, args)
  101. // return op
  102. // }
  103. // func (op MyOperation) Finish(res MyOperationRes) {
  104. // dyngo.FinishOperation(op, res)
  105. // }
  106. func NewOperation(parent Operation) Operation {
  107. if parent == nil {
  108. parent = rootOperation
  109. }
  110. return newOperation(parent)
  111. }
  112. // StartOperation starts a new operation along with its arguments and emits a
  113. // start event with the operation arguments.
  114. func StartOperation(op Operation, args interface{}) {
  115. argsType := reflect.TypeOf(args)
  116. // Bubble-up the start event starting from the parent operation as you can't
  117. // listen for your own start event
  118. for current := op.Parent(); current != nil; current = current.Parent() {
  119. current.emitEvent(argsType, op, args)
  120. }
  121. }
  122. func newOperation(parent Operation) *operation {
  123. return &operation{parent: parent}
  124. }
  125. // Parent return the parent operation. It returns nil for the root operation.
  126. func (o *operation) Parent() Operation {
  127. return o.parent
  128. }
  129. // FinishOperation finishes the operation along with its results and emits a
  130. // finish event with the operation results.
  131. // The operation is then disabled and its event listeners removed.
  132. func FinishOperation(op Operation, results interface{}) {
  133. op.finish(op, results)
  134. }
  135. func (o *operation) finish(op Operation, results interface{}) {
  136. // Defer the call to o.disable() first so that the RWMutex gets unlocked first
  137. defer o.disable()
  138. o.mu.RLock()
  139. defer o.mu.RUnlock() // Deferred and stacked on top of the previously deferred call to o.disable()
  140. if o.disabled {
  141. return
  142. }
  143. resType := reflect.TypeOf(results)
  144. for current := op; current != nil; current = current.Parent() {
  145. current.emitEvent(resType, op, results)
  146. }
  147. }
  148. // Disable the operation and remove all its event listeners.
  149. func (o *operation) disable() {
  150. o.mu.Lock()
  151. defer o.mu.Unlock()
  152. if o.disabled {
  153. return
  154. }
  155. o.disabled = true
  156. o.eventRegister.clear()
  157. }
  158. // Register allows to register the given event listeners to the operation. An
  159. // unregistration function is returned allowing to unregister the event
  160. // listeners from the operation.
  161. func (o *operation) register(l ...EventListener) UnregisterFunc {
  162. // eventRegisterIndex allows to lookup for the event listener in the event register.
  163. type eventRegisterIndex struct {
  164. key reflect.Type
  165. id eventListenerID
  166. }
  167. o.mu.RLock()
  168. defer o.mu.RUnlock()
  169. if o.disabled {
  170. return func() {}
  171. }
  172. indices := make([]eventRegisterIndex, len(l))
  173. for i, l := range l {
  174. if l == nil {
  175. continue
  176. }
  177. key := l.ListenedType()
  178. id := o.eventRegister.add(key, l)
  179. indices[i] = eventRegisterIndex{
  180. key: key,
  181. id: id,
  182. }
  183. }
  184. return func() {
  185. for _, ix := range indices {
  186. o.eventRegister.remove(ix.key, ix.id)
  187. }
  188. }
  189. }
  190. // On registers the event listener. The difference with the Register() is that
  191. // it doesn't return a function closure, which avoids unnecessary allocations
  192. // For example:
  193. // op.On(MyOperationStart(func (op MyOperation, args MyOperationArgs) {
  194. // // ...
  195. // }))
  196. func (o *operation) On(l EventListener) {
  197. o.mu.RLock()
  198. defer o.mu.RUnlock()
  199. if o.disabled {
  200. return
  201. }
  202. o.eventRegister.add(l.ListenedType(), l)
  203. }
  204. type (
  205. // eventRegister implements a thread-safe list of event listeners.
  206. eventRegister struct {
  207. mu sync.RWMutex
  208. listeners eventListenerMap
  209. }
  210. // eventListenerMap is the map of event listeners. The list of listeners are
  211. // indexed by the operation argument or result type the event listener
  212. // expects.
  213. eventListenerMap map[reflect.Type][]eventListenerMapEntry
  214. eventListenerMapEntry struct {
  215. id eventListenerID
  216. listener EventListener
  217. }
  218. // eventListenerID is the unique ID of an event when registering it. It
  219. // allows to find it back and remove it from the list of event listeners
  220. // when unregistering it.
  221. eventListenerID uint32
  222. )
  223. // lastID is the last event listener ID that was given to the latest event
  224. // listener.
  225. var lastID eventListenerID
  226. // nextID atomically increments lastID and returns the new event listener ID to
  227. // use.
  228. func nextID() eventListenerID {
  229. return eventListenerID(atomic.AddUint32((*uint32)(&lastID), 1))
  230. }
  231. func (r *eventRegister) add(key reflect.Type, l EventListener) eventListenerID {
  232. r.mu.Lock()
  233. defer r.mu.Unlock()
  234. if r.listeners == nil {
  235. r.listeners = make(eventListenerMap)
  236. }
  237. // id is computed when the lock is exclusively taken so that we know
  238. // listeners are added in incremental id order.
  239. // This allows to use the optimized sort.Search() function to remove the
  240. // entry.
  241. id := nextID()
  242. r.listeners[key] = append(r.listeners[key], eventListenerMapEntry{
  243. id: id,
  244. listener: l,
  245. })
  246. return id
  247. }
  248. func (r *eventRegister) remove(key reflect.Type, id eventListenerID) {
  249. r.mu.Lock()
  250. defer r.mu.Unlock()
  251. if r.listeners == nil {
  252. return
  253. }
  254. listeners := r.listeners[key]
  255. length := len(listeners)
  256. i := sort.Search(length, func(i int) bool {
  257. return listeners[i].id >= id
  258. })
  259. if i < length && listeners[i].id == id {
  260. r.listeners[key] = append(listeners[:i], listeners[i+1:]...)
  261. }
  262. }
  263. func (r *eventRegister) clear() {
  264. r.mu.Lock()
  265. defer r.mu.Unlock()
  266. r.listeners = nil
  267. }
  268. func (r *eventRegister) emitEvent(key reflect.Type, op Operation, v interface{}) {
  269. defer func() {
  270. if r := recover(); r != nil {
  271. log.Error("appsec: recovered from an unexpected panic from an event listener: %+v", r)
  272. }
  273. }()
  274. r.mu.RLock()
  275. defer r.mu.RUnlock()
  276. for _, e := range r.listeners[key] {
  277. e.listener.Call(op, v)
  278. }
  279. }