datastore_file.go 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412
  1. /*
  2. Copyright (c) 2016-2017 VMware, Inc. All Rights Reserved.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. package object
  14. import (
  15. "bytes"
  16. "context"
  17. "errors"
  18. "fmt"
  19. "io"
  20. "net/http"
  21. "os"
  22. "path"
  23. "sync"
  24. "time"
  25. "github.com/vmware/govmomi/vim25/soap"
  26. )
  27. // DatastoreFile implements io.Reader, io.Seeker and io.Closer interfaces for datastore file access.
  28. type DatastoreFile struct {
  29. d Datastore
  30. ctx context.Context
  31. name string
  32. buf io.Reader
  33. body io.ReadCloser
  34. length int64
  35. offset struct {
  36. read, seek int64
  37. }
  38. }
  39. // Open opens the named file relative to the Datastore.
  40. func (d Datastore) Open(ctx context.Context, name string) (*DatastoreFile, error) {
  41. return &DatastoreFile{
  42. d: d,
  43. name: name,
  44. length: -1,
  45. ctx: ctx,
  46. }, nil
  47. }
  48. // Read reads up to len(b) bytes from the DatastoreFile.
  49. func (f *DatastoreFile) Read(b []byte) (int, error) {
  50. if f.offset.read != f.offset.seek {
  51. // A Seek() call changed the offset, we need to issue a new GET
  52. _ = f.Close()
  53. f.offset.read = f.offset.seek
  54. } else if f.buf != nil {
  55. // f.buf + f behaves like an io.MultiReader
  56. n, err := f.buf.Read(b)
  57. if err == io.EOF {
  58. f.buf = nil // buffer has been drained
  59. }
  60. if n > 0 {
  61. return n, nil
  62. }
  63. }
  64. body, err := f.get()
  65. if err != nil {
  66. return 0, err
  67. }
  68. n, err := body.Read(b)
  69. f.offset.read += int64(n)
  70. f.offset.seek += int64(n)
  71. return n, err
  72. }
  73. // Close closes the DatastoreFile.
  74. func (f *DatastoreFile) Close() error {
  75. var err error
  76. if f.body != nil {
  77. err = f.body.Close()
  78. f.body = nil
  79. }
  80. f.buf = nil
  81. return err
  82. }
  83. // Seek sets the offset for the next Read on the DatastoreFile.
  84. func (f *DatastoreFile) Seek(offset int64, whence int) (int64, error) {
  85. switch whence {
  86. case io.SeekStart:
  87. case io.SeekCurrent:
  88. offset += f.offset.seek
  89. case io.SeekEnd:
  90. if f.length < 0 {
  91. _, err := f.Stat()
  92. if err != nil {
  93. return 0, err
  94. }
  95. }
  96. offset += f.length
  97. default:
  98. return 0, errors.New("Seek: invalid whence")
  99. }
  100. // allow negative SeekStart for initial Range request
  101. if offset < 0 {
  102. return 0, errors.New("Seek: invalid offset")
  103. }
  104. f.offset.seek = offset
  105. return offset, nil
  106. }
  107. type fileStat struct {
  108. file *DatastoreFile
  109. header http.Header
  110. }
  111. func (s *fileStat) Name() string {
  112. return path.Base(s.file.name)
  113. }
  114. func (s *fileStat) Size() int64 {
  115. return s.file.length
  116. }
  117. func (s *fileStat) Mode() os.FileMode {
  118. return 0
  119. }
  120. func (s *fileStat) ModTime() time.Time {
  121. return time.Now() // no Last-Modified
  122. }
  123. func (s *fileStat) IsDir() bool {
  124. return false
  125. }
  126. func (s *fileStat) Sys() interface{} {
  127. return s.header
  128. }
  129. func statusError(res *http.Response) error {
  130. if res.StatusCode == http.StatusNotFound {
  131. return os.ErrNotExist
  132. }
  133. return errors.New(res.Status)
  134. }
  135. // Stat returns the os.FileInfo interface describing file.
  136. func (f *DatastoreFile) Stat() (os.FileInfo, error) {
  137. // TODO: consider using Datastore.Stat() instead
  138. u, p, err := f.d.downloadTicket(f.ctx, f.name, &soap.Download{Method: "HEAD"})
  139. if err != nil {
  140. return nil, err
  141. }
  142. res, err := f.d.Client().DownloadRequest(f.ctx, u, p)
  143. if err != nil {
  144. return nil, err
  145. }
  146. if res.StatusCode != http.StatusOK {
  147. return nil, statusError(res)
  148. }
  149. f.length = res.ContentLength
  150. return &fileStat{f, res.Header}, nil
  151. }
  152. func (f *DatastoreFile) get() (io.Reader, error) {
  153. if f.body != nil {
  154. return f.body, nil
  155. }
  156. u, p, err := f.d.downloadTicket(f.ctx, f.name, nil)
  157. if err != nil {
  158. return nil, err
  159. }
  160. if f.offset.read != 0 {
  161. p.Headers = map[string]string{
  162. "Range": fmt.Sprintf("bytes=%d-", f.offset.read),
  163. }
  164. }
  165. res, err := f.d.Client().DownloadRequest(f.ctx, u, p)
  166. if err != nil {
  167. return nil, err
  168. }
  169. switch res.StatusCode {
  170. case http.StatusOK:
  171. f.length = res.ContentLength
  172. case http.StatusPartialContent:
  173. var start, end int
  174. cr := res.Header.Get("Content-Range")
  175. _, err = fmt.Sscanf(cr, "bytes %d-%d/%d", &start, &end, &f.length)
  176. if err != nil {
  177. f.length = -1
  178. }
  179. case http.StatusRequestedRangeNotSatisfiable:
  180. // ok: Read() will return io.EOF
  181. default:
  182. return nil, statusError(res)
  183. }
  184. if f.length < 0 {
  185. _ = res.Body.Close()
  186. return nil, errors.New("unable to determine file size")
  187. }
  188. f.body = res.Body
  189. return f.body, nil
  190. }
  191. func lastIndexLines(s []byte, line *int, include func(l int, m string) bool) (int64, bool) {
  192. i := len(s) - 1
  193. done := false
  194. for i > 0 {
  195. o := bytes.LastIndexByte(s[:i], '\n')
  196. if o < 0 {
  197. break
  198. }
  199. msg := string(s[o+1 : i+1])
  200. if !include(*line, msg) {
  201. done = true
  202. break
  203. } else {
  204. i = o
  205. *line++
  206. }
  207. }
  208. return int64(i), done
  209. }
  210. // Tail seeks to the position of the last N lines of the file.
  211. func (f *DatastoreFile) Tail(n int) error {
  212. return f.TailFunc(n, func(line int, _ string) bool { return n > line })
  213. }
  214. // TailFunc will seek backwards in the datastore file until it hits a line that does
  215. // not satisfy the supplied `include` function.
  216. func (f *DatastoreFile) TailFunc(lines int, include func(line int, message string) bool) error {
  217. // Read the file in reverse using bsize chunks
  218. const bsize = int64(1024 * 16)
  219. fsize, err := f.Seek(0, io.SeekEnd)
  220. if err != nil {
  221. return err
  222. }
  223. if lines == 0 {
  224. return nil
  225. }
  226. chunk := int64(-1)
  227. buf := bytes.NewBuffer(make([]byte, 0, bsize))
  228. line := 0
  229. for {
  230. var eof bool
  231. var pos int64
  232. nread := bsize
  233. offset := chunk * bsize
  234. remain := fsize + offset
  235. if remain < 0 {
  236. if pos, err = f.Seek(0, io.SeekStart); err != nil {
  237. return err
  238. }
  239. nread = bsize + remain
  240. eof = true
  241. } else if pos, err = f.Seek(offset, io.SeekEnd); err != nil {
  242. return err
  243. }
  244. if _, err = io.CopyN(buf, f, nread); err != nil {
  245. if err != io.EOF {
  246. return err
  247. }
  248. }
  249. b := buf.Bytes()
  250. idx, done := lastIndexLines(b, &line, include)
  251. if done {
  252. if chunk == -1 {
  253. // We found all N lines in the last chunk of the file.
  254. // The seek offset is also now at the current end of file.
  255. // Save this buffer to avoid another GET request when Read() is called.
  256. buf.Next(int(idx + 1))
  257. f.buf = buf
  258. return nil
  259. }
  260. if _, err = f.Seek(pos+idx+1, io.SeekStart); err != nil {
  261. return err
  262. }
  263. break
  264. }
  265. if eof {
  266. if remain < 0 {
  267. // We found < N lines in the entire file, so seek to the start.
  268. _, _ = f.Seek(0, io.SeekStart)
  269. }
  270. break
  271. }
  272. chunk--
  273. buf.Reset()
  274. }
  275. return nil
  276. }
  277. type followDatastoreFile struct {
  278. r *DatastoreFile
  279. c chan struct{}
  280. i time.Duration
  281. o sync.Once
  282. }
  283. // Read reads up to len(b) bytes from the DatastoreFile being followed.
  284. // This method will block until data is read, an error other than io.EOF is returned or Close() is called.
  285. func (f *followDatastoreFile) Read(p []byte) (int, error) {
  286. offset := f.r.offset.seek
  287. stop := false
  288. for {
  289. n, err := f.r.Read(p)
  290. if err != nil && err == io.EOF {
  291. _ = f.r.Close() // GET request body has been drained.
  292. if stop {
  293. return n, err
  294. }
  295. err = nil
  296. }
  297. if n > 0 {
  298. return n, err
  299. }
  300. select {
  301. case <-f.c:
  302. // Wake up and stop polling once the body has been drained
  303. stop = true
  304. case <-time.After(f.i):
  305. }
  306. info, serr := f.r.Stat()
  307. if serr != nil {
  308. // Return EOF rather than 404 if the file goes away
  309. if serr == os.ErrNotExist {
  310. _ = f.r.Close()
  311. return 0, io.EOF
  312. }
  313. return 0, serr
  314. }
  315. if info.Size() < offset {
  316. // assume file has be truncated
  317. offset, err = f.r.Seek(0, io.SeekStart)
  318. if err != nil {
  319. return 0, err
  320. }
  321. }
  322. }
  323. }
  324. // Close will stop Follow polling and close the underlying DatastoreFile.
  325. func (f *followDatastoreFile) Close() error {
  326. f.o.Do(func() { close(f.c) })
  327. return nil
  328. }
  329. // Follow returns an io.ReadCloser to stream the file contents as data is appended.
  330. func (f *DatastoreFile) Follow(interval time.Duration) io.ReadCloser {
  331. return &followDatastoreFile{
  332. r: f,
  333. c: make(chan struct{}),
  334. i: interval,
  335. }
  336. }