hastate.go 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212
  1. // Copyright 2019 Yunion
  2. //
  3. // Licensed under the Apache License, Version 2.0 (the "License");
  4. // you may not use this file except in compliance with the License.
  5. // You may obtain a copy of the License at
  6. //
  7. // http://www.apache.org/licenses/LICENSE-2.0
  8. //
  9. // Unless required by applicable law or agreed to in writing, software
  10. // distributed under the License is distributed on an "AS IS" BASIS,
  11. // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. // See the License for the specific language governing permissions and
  13. // limitations under the License.
  14. package lbagent
  15. import (
  16. "context"
  17. "fmt"
  18. "io/ioutil"
  19. "os"
  20. "path"
  21. "strings"
  22. "sync"
  23. "syscall"
  24. "time"
  25. "unicode"
  26. "github.com/fsnotify/fsnotify"
  27. "yunion.io/x/log"
  28. api "yunion.io/x/onecloud/pkg/apis/compute"
  29. agentutils "yunion.io/x/onecloud/pkg/lbagent/utils"
  30. )
  31. const (
  32. HA_STATE_SCRIPT_NAME = "ha_state.sh"
  33. HA_STATE_SCRIPT_CONTENT = `
  34. #!/bin/bash
  35. echo "keepalived notify $@" > /proc/1/fd/1
  36. echo "$@" >%s
  37. `
  38. HA_STATE_FILENAME = "ha_state"
  39. )
  40. type HaStateProvider interface {
  41. StateChannel() <-chan string
  42. StateScript() string
  43. }
  44. type HaStateWatcher struct {
  45. HaStateScriptPath string
  46. HaStatePath string
  47. CurrentState string // TODO hide it
  48. opts *Options
  49. w *fsnotify.Watcher
  50. C chan string
  51. }
  52. func (hsw *HaStateWatcher) StateChannel() <-chan string {
  53. return hsw.C
  54. }
  55. func (hsw *HaStateWatcher) StateScript() string {
  56. return hsw.HaStateScriptPath
  57. }
  58. func (hsw *HaStateWatcher) Run(ctx context.Context) {
  59. defer func() {
  60. log.Infof("ha state watcher bye")
  61. wg := ctx.Value("wg").(*sync.WaitGroup)
  62. wg.Done()
  63. }()
  64. hsw.loadHaState()
  65. hsw.C <- hsw.CurrentState
  66. statePending := false
  67. tick := time.NewTicker(3 * time.Second)
  68. defer tick.Stop()
  69. for {
  70. select {
  71. case ev := <-hsw.w.Events:
  72. switch {
  73. case ev.Name == hsw.opts.haproxyRunDir:
  74. switch ev.Op {
  75. case fsnotify.Remove, fsnotify.Rename:
  76. log.Errorf("run dir %s", ev.Op.String())
  77. hsw.sayBye()
  78. default:
  79. log.Debugf("ignored %s", ev)
  80. }
  81. case ev.Name == hsw.HaStatePath:
  82. log.Infof("hastate file: %s", ev)
  83. switch ev.Op {
  84. case fsnotify.Create, fsnotify.Write:
  85. err := hsw.loadHaState()
  86. if err != nil {
  87. log.Errorf("load state: %s", err)
  88. hsw.sayBye()
  89. }
  90. select {
  91. case hsw.C <- hsw.CurrentState:
  92. statePending = false
  93. default:
  94. statePending = true
  95. }
  96. }
  97. }
  98. case err := <-hsw.w.Errors:
  99. log.Errorf("watcher err: %s", err)
  100. hsw.sayBye()
  101. case <-tick.C:
  102. if statePending {
  103. select {
  104. case hsw.C <- hsw.CurrentState:
  105. statePending = false
  106. default:
  107. }
  108. }
  109. case <-ctx.Done():
  110. return
  111. }
  112. }
  113. }
  114. func (hsw *HaStateWatcher) loadHaState() (err error) {
  115. defer func() {
  116. if err != nil {
  117. hsw.CurrentState = api.LB_HA_STATE_UNKNOWN
  118. }
  119. }()
  120. data, err := ioutil.ReadFile(hsw.HaStatePath)
  121. if err != nil {
  122. return err
  123. }
  124. log.Infof("got state: %s", data)
  125. // Sample content
  126. //
  127. // INSTANCE YunionLB BACKUP 110
  128. //
  129. fields := strings.Fields(string(data))
  130. if len(fields) >= 3 {
  131. hsw.CurrentState = fields[2]
  132. return
  133. }
  134. err = fmt.Errorf("ha state file contains too little info")
  135. return
  136. }
  137. func (hsw *HaStateWatcher) sayBye() {
  138. syscall.Kill(os.Getpid(), syscall.SIGTERM)
  139. }
  140. func NewHaStateWatcher(opts *Options) (hsw *HaStateWatcher, err error) {
  141. var (
  142. w *fsnotify.Watcher
  143. )
  144. defer func() {
  145. if err != nil {
  146. if w != nil {
  147. w.Close()
  148. }
  149. }
  150. }()
  151. haStateScriptPath := path.Join(opts.haproxyShareDir, HA_STATE_SCRIPT_NAME)
  152. haStatePath := path.Join(opts.haproxyRunDir, HA_STATE_FILENAME)
  153. {
  154. content := fmt.Sprintf(HA_STATE_SCRIPT_CONTENT, haStatePath)
  155. content = strings.TrimLeftFunc(content, unicode.IsSpace)
  156. mode := agentutils.FileModeFileExec
  157. err = ioutil.WriteFile(haStateScriptPath, []byte(content), mode)
  158. if err != nil {
  159. return
  160. }
  161. var fi os.FileInfo
  162. fi, err = os.Stat(haStateScriptPath)
  163. if err != nil {
  164. return
  165. }
  166. if fi.Mode() != mode {
  167. err = os.Chmod(haStateScriptPath, mode)
  168. if err != nil {
  169. return
  170. }
  171. }
  172. }
  173. w, err = fsnotify.NewWatcher()
  174. if err != nil {
  175. return
  176. }
  177. err = w.Add(opts.haproxyRunDir)
  178. if err != nil {
  179. return
  180. }
  181. hsw = &HaStateWatcher{
  182. HaStateScriptPath: haStateScriptPath,
  183. HaStatePath: haStatePath,
  184. CurrentState: api.LB_HA_STATE_UNKNOWN,
  185. opts: opts,
  186. w: w,
  187. C: make(chan string),
  188. }
  189. return
  190. }