bus.go 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package bus
  15. import (
  16. "context"
  17. "reflect"
  18. "yunion.io/x/pkg/errors"
  19. )
  20. // HandlerFunc defines a handler function interface.
  21. type HandlerFunc interface{}
  22. // CtxHandlerFunc defines a context handler function.
  23. type CtxHandlerFunc func()
  24. // Msg defines a message interface.
  25. type Msg interface{}
  26. // ErrHandlerNotFound defines an error if a handler is not found
  27. var ErrHandlerNotFound = errors.Error("handler not found")
  28. // TransactionManager defines a transaction interface
  29. type TransactionManager interface {
  30. InTransaction(ctx context.Context, fn func(ctx context.Context) error) error
  31. }
  32. // Bus type defines the bus interface structure
  33. type Bus interface {
  34. Dispatch(msg Msg) error
  35. DispatchCtx(ctx context.Context, msg Msg) error
  36. Publish(msg Msg) error
  37. // InTransaction starts a transaction and store it in the context.
  38. // The caller can then pass a function with multiple DispatchCtx calls that
  39. // all will be executed in the same transaction. InTransaction will rollback if the
  40. // callback returns an error.
  41. InTransaction(ctx context.Context, fn func(ctx context.Context) error) error
  42. AddHandler(handler HandlerFunc)
  43. AddHandlerCtx(handler HandlerFunc)
  44. AddEventListener(handler HandlerFunc)
  45. // SetTransactionManager allows the user to replace the internal
  46. // noop TransactionManager that is responsible for managing
  47. // transactions in `InTransaction`
  48. SetTransactionManager(tm TransactionManager)
  49. }
  50. type noopTransactionManager struct{}
  51. func (*noopTransactionManager) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
  52. return fn(ctx)
  53. }
  54. // InProcBus defines the bus structure
  55. type InProcBus struct {
  56. handlers map[string]HandlerFunc
  57. handlersWithCtx map[string]HandlerFunc
  58. listeners map[string][]HandlerFunc
  59. txMng TransactionManager
  60. }
  61. // InTransaction defines an in transaction function
  62. func (b *InProcBus) InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
  63. return b.txMng.InTransaction(ctx, fn)
  64. }
  65. // temp stuff, not sure how to handle bus instance, and init yet
  66. var globalBus = New()
  67. // New initialize the bus
  68. func New() Bus {
  69. bus := &InProcBus{}
  70. bus.handlers = make(map[string]HandlerFunc)
  71. bus.handlersWithCtx = make(map[string]HandlerFunc)
  72. bus.listeners = make(map[string][]HandlerFunc)
  73. bus.txMng = &noopTransactionManager{}
  74. return bus
  75. }
  76. // Want to get rid of global bus
  77. func GetBus() Bus {
  78. return globalBus
  79. }
  80. // SetTransactionManager function assign a transaction manager to the bus.
  81. func (b *InProcBus) SetTransactionManager(tm TransactionManager) {
  82. b.txMng = tm
  83. }
  84. // DispatchCtx function dispatch a message to the bus context.
  85. func (b *InProcBus) DispatchCtx(ctx context.Context, msg Msg) error {
  86. var msgName = reflect.TypeOf(msg).Elem().Name()
  87. var handler = b.handlersWithCtx[msgName]
  88. if handler == nil {
  89. return ErrHandlerNotFound
  90. }
  91. var params = []reflect.Value{}
  92. params = append(params, reflect.ValueOf(ctx))
  93. params = append(params, reflect.ValueOf(msg))
  94. ret := reflect.ValueOf(handler).Call(params)
  95. err := ret[0].Interface()
  96. if err == nil {
  97. return nil
  98. }
  99. return err.(error)
  100. }
  101. // Dispatch function dispatch a message to the bus.
  102. func (b *InProcBus) Dispatch(msg Msg) error {
  103. var msgName = reflect.TypeOf(msg).Elem().Name()
  104. var handler = b.handlersWithCtx[msgName]
  105. withCtx := true
  106. if handler == nil {
  107. withCtx = false
  108. handler = b.handlers[msgName]
  109. }
  110. if handler == nil {
  111. return ErrHandlerNotFound
  112. }
  113. var params = []reflect.Value{}
  114. if withCtx {
  115. params = append(params, reflect.ValueOf(context.Background()))
  116. }
  117. params = append(params, reflect.ValueOf(msg))
  118. ret := reflect.ValueOf(handler).Call(params)
  119. err := ret[0].Interface()
  120. if err == nil {
  121. return nil
  122. }
  123. return err.(error)
  124. }
  125. // Publish function publish a message to the bus listener.
  126. func (b *InProcBus) Publish(msg Msg) error {
  127. var msgName = reflect.TypeOf(msg).Elem().Name()
  128. var listeners = b.listeners[msgName]
  129. var params = make([]reflect.Value, 1)
  130. params[0] = reflect.ValueOf(msg)
  131. for _, listenerHandler := range listeners {
  132. ret := reflect.ValueOf(listenerHandler).Call(params)
  133. err := ret[0].Interface()
  134. if err != nil {
  135. return err.(error)
  136. }
  137. }
  138. return nil
  139. }
  140. func (b *InProcBus) AddHandler(handler HandlerFunc) {
  141. handlerType := reflect.TypeOf(handler)
  142. queryTypeName := handlerType.In(0).Elem().Name()
  143. b.handlers[queryTypeName] = handler
  144. }
  145. func (b *InProcBus) AddHandlerCtx(handler HandlerFunc) {
  146. handlerType := reflect.TypeOf(handler)
  147. queryTypeName := handlerType.In(1).Elem().Name()
  148. b.handlersWithCtx[queryTypeName] = handler
  149. }
  150. func (b *InProcBus) AddEventListener(handler HandlerFunc) {
  151. handlerType := reflect.TypeOf(handler)
  152. eventName := handlerType.In(0).Elem().Name()
  153. _, exists := b.listeners[eventName]
  154. if !exists {
  155. b.listeners[eventName] = make([]HandlerFunc, 0)
  156. }
  157. b.listeners[eventName] = append(b.listeners[eventName], handler)
  158. }
  159. // AddHandler attach a handler function to the global bus
  160. // Package level function
  161. func AddHandler(implName string, handler HandlerFunc) {
  162. globalBus.AddHandler(handler)
  163. }
  164. // AddHandlerCtx attach a handler function to the global bus context
  165. // Package level functions
  166. func AddHandlerCtx(implName string, handler HandlerFunc) {
  167. globalBus.AddHandlerCtx(handler)
  168. }
  169. // AddEventListener attach a handler function to the event listener
  170. // Package level functions
  171. func AddEventListener(handler HandlerFunc) {
  172. globalBus.AddEventListener(handler)
  173. }
  174. func Dispatch(msg Msg) error {
  175. return globalBus.Dispatch(msg)
  176. }
  177. func DispatchCtx(ctx context.Context, msg Msg) error {
  178. return globalBus.DispatchCtx(ctx, msg)
  179. }
  180. func Publish(msg Msg) error {
  181. return globalBus.Publish(msg)
  182. }
  183. // InTransaction starts a transaction and store it in the context.
  184. // The caller can then pass a function with multiple DispatchCtx calls that
  185. // all will be executed in the same transaction. InTransaction will rollback if the
  186. // callback returns an error.
  187. func InTransaction(ctx context.Context, fn func(ctx context.Context) error) error {
  188. return globalBus.InTransaction(ctx, fn)
  189. }
  190. func ClearBusHandlers() {
  191. globalBus = New()
  192. }