request.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653
  1. package sftp
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strings"
  9. "sync"
  10. "syscall"
  11. )
  12. // MaxFilelist is the max number of files to return in a readdir batch.
  13. var MaxFilelist int64 = 100
  14. // state encapsulates the reader/writer/readdir from handlers.
  15. type state struct {
  16. mu sync.RWMutex
  17. writerAt io.WriterAt
  18. readerAt io.ReaderAt
  19. writerAtReaderAt WriterAtReaderAt
  20. listerAt ListerAt
  21. lsoffset int64
  22. }
  23. // copy returns a shallow copy the state.
  24. // This is broken out to specific fields,
  25. // because we have to copy around the mutex in state.
  26. func (s *state) copy() state {
  27. s.mu.RLock()
  28. defer s.mu.RUnlock()
  29. return state{
  30. writerAt: s.writerAt,
  31. readerAt: s.readerAt,
  32. writerAtReaderAt: s.writerAtReaderAt,
  33. listerAt: s.listerAt,
  34. lsoffset: s.lsoffset,
  35. }
  36. }
  37. func (s *state) setReaderAt(rd io.ReaderAt) {
  38. s.mu.Lock()
  39. defer s.mu.Unlock()
  40. s.readerAt = rd
  41. }
  42. func (s *state) getReaderAt() io.ReaderAt {
  43. s.mu.RLock()
  44. defer s.mu.RUnlock()
  45. return s.readerAt
  46. }
  47. func (s *state) setWriterAt(rd io.WriterAt) {
  48. s.mu.Lock()
  49. defer s.mu.Unlock()
  50. s.writerAt = rd
  51. }
  52. func (s *state) getWriterAt() io.WriterAt {
  53. s.mu.RLock()
  54. defer s.mu.RUnlock()
  55. return s.writerAt
  56. }
  57. func (s *state) setWriterAtReaderAt(rw WriterAtReaderAt) {
  58. s.mu.Lock()
  59. defer s.mu.Unlock()
  60. s.writerAtReaderAt = rw
  61. }
  62. func (s *state) getWriterAtReaderAt() WriterAtReaderAt {
  63. s.mu.RLock()
  64. defer s.mu.RUnlock()
  65. return s.writerAtReaderAt
  66. }
  67. func (s *state) getAllReaderWriters() (io.ReaderAt, io.WriterAt, WriterAtReaderAt) {
  68. s.mu.RLock()
  69. defer s.mu.RUnlock()
  70. return s.readerAt, s.writerAt, s.writerAtReaderAt
  71. }
  72. // Returns current offset for file list
  73. func (s *state) lsNext() int64 {
  74. s.mu.RLock()
  75. defer s.mu.RUnlock()
  76. return s.lsoffset
  77. }
  78. // Increases next offset
  79. func (s *state) lsInc(offset int64) {
  80. s.mu.Lock()
  81. defer s.mu.Unlock()
  82. s.lsoffset += offset
  83. }
  84. // manage file read/write state
  85. func (s *state) setListerAt(la ListerAt) {
  86. s.mu.Lock()
  87. defer s.mu.Unlock()
  88. s.listerAt = la
  89. }
  90. func (s *state) getListerAt() ListerAt {
  91. s.mu.RLock()
  92. defer s.mu.RUnlock()
  93. return s.listerAt
  94. }
  95. // Request contains the data and state for the incoming service request.
  96. type Request struct {
  97. // Get, Put, Setstat, Stat, Rename, Remove
  98. // Rmdir, Mkdir, List, Readlink, Link, Symlink
  99. Method string
  100. Filepath string
  101. Flags uint32
  102. Attrs []byte // convert to sub-struct
  103. Target string // for renames and sym-links
  104. handle string
  105. // reader/writer/readdir from handlers
  106. state
  107. // context lasts duration of request
  108. ctx context.Context
  109. cancelCtx context.CancelFunc
  110. }
  111. // NewRequest creates a new Request object.
  112. func NewRequest(method, path string) *Request {
  113. return &Request{
  114. Method: method,
  115. Filepath: cleanPath(path),
  116. }
  117. }
  118. // copy returns a shallow copy of existing request.
  119. // This is broken out to specific fields,
  120. // because we have to copy around the mutex in state.
  121. func (r *Request) copy() *Request {
  122. return &Request{
  123. Method: r.Method,
  124. Filepath: r.Filepath,
  125. Flags: r.Flags,
  126. Attrs: r.Attrs,
  127. Target: r.Target,
  128. handle: r.handle,
  129. state: r.state.copy(),
  130. ctx: r.ctx,
  131. cancelCtx: r.cancelCtx,
  132. }
  133. }
  134. // New Request initialized based on packet data
  135. func requestFromPacket(ctx context.Context, pkt hasPath, baseDir string) *Request {
  136. request := &Request{
  137. Method: requestMethod(pkt),
  138. Filepath: cleanPathWithBase(baseDir, pkt.getPath()),
  139. }
  140. request.ctx, request.cancelCtx = context.WithCancel(ctx)
  141. switch p := pkt.(type) {
  142. case *sshFxpOpenPacket:
  143. request.Flags = p.Pflags
  144. case *sshFxpSetstatPacket:
  145. request.Flags = p.Flags
  146. request.Attrs = p.Attrs.([]byte)
  147. case *sshFxpRenamePacket:
  148. request.Target = cleanPathWithBase(baseDir, p.Newpath)
  149. case *sshFxpSymlinkPacket:
  150. // NOTE: given a POSIX compliant signature: symlink(target, linkpath string)
  151. // this makes Request.Target the linkpath, and Request.Filepath the target.
  152. request.Target = cleanPathWithBase(baseDir, p.Linkpath)
  153. request.Filepath = p.Targetpath
  154. case *sshFxpExtendedPacketHardlink:
  155. request.Target = cleanPathWithBase(baseDir, p.Newpath)
  156. }
  157. return request
  158. }
  159. // Context returns the request's context. To change the context,
  160. // use WithContext.
  161. //
  162. // The returned context is always non-nil; it defaults to the
  163. // background context.
  164. //
  165. // For incoming server requests, the context is canceled when the
  166. // request is complete or the client's connection closes.
  167. func (r *Request) Context() context.Context {
  168. if r.ctx != nil {
  169. return r.ctx
  170. }
  171. return context.Background()
  172. }
  173. // WithContext returns a copy of r with its context changed to ctx.
  174. // The provided ctx must be non-nil.
  175. func (r *Request) WithContext(ctx context.Context) *Request {
  176. if ctx == nil {
  177. panic("nil context")
  178. }
  179. r2 := r.copy()
  180. r2.ctx = ctx
  181. r2.cancelCtx = nil
  182. return r2
  183. }
  184. // Close reader/writer if possible
  185. func (r *Request) close() error {
  186. defer func() {
  187. if r.cancelCtx != nil {
  188. r.cancelCtx()
  189. }
  190. }()
  191. rd, wr, rw := r.getAllReaderWriters()
  192. var err error
  193. // Close errors on a Writer are far more likely to be the important one.
  194. // As they can be information that there was a loss of data.
  195. if c, ok := wr.(io.Closer); ok {
  196. if err2 := c.Close(); err == nil {
  197. // update error if it is still nil
  198. err = err2
  199. }
  200. }
  201. if c, ok := rw.(io.Closer); ok {
  202. if err2 := c.Close(); err == nil {
  203. // update error if it is still nil
  204. err = err2
  205. r.setWriterAtReaderAt(nil)
  206. }
  207. }
  208. if c, ok := rd.(io.Closer); ok {
  209. if err2 := c.Close(); err == nil {
  210. // update error if it is still nil
  211. err = err2
  212. }
  213. }
  214. return err
  215. }
  216. // Notify transfer error if any
  217. func (r *Request) transferError(err error) {
  218. if err == nil {
  219. return
  220. }
  221. rd, wr, rw := r.getAllReaderWriters()
  222. if t, ok := wr.(TransferError); ok {
  223. t.TransferError(err)
  224. }
  225. if t, ok := rw.(TransferError); ok {
  226. t.TransferError(err)
  227. }
  228. if t, ok := rd.(TransferError); ok {
  229. t.TransferError(err)
  230. }
  231. }
  232. // called from worker to handle packet/request
  233. func (r *Request) call(handlers Handlers, pkt requestPacket, alloc *allocator, orderID uint32) responsePacket {
  234. switch r.Method {
  235. case "Get":
  236. return fileget(handlers.FileGet, r, pkt, alloc, orderID)
  237. case "Put":
  238. return fileput(handlers.FilePut, r, pkt, alloc, orderID)
  239. case "Open":
  240. return fileputget(handlers.FilePut, r, pkt, alloc, orderID)
  241. case "Setstat", "Rename", "Rmdir", "Mkdir", "Link", "Symlink", "Remove", "PosixRename", "StatVFS":
  242. return filecmd(handlers.FileCmd, r, pkt)
  243. case "List":
  244. return filelist(handlers.FileList, r, pkt)
  245. case "Stat", "Lstat":
  246. return filestat(handlers.FileList, r, pkt)
  247. case "Readlink":
  248. if readlinkFileLister, ok := handlers.FileList.(ReadlinkFileLister); ok {
  249. return readlink(readlinkFileLister, r, pkt)
  250. }
  251. return filestat(handlers.FileList, r, pkt)
  252. default:
  253. return statusFromError(pkt.id(), fmt.Errorf("unexpected method: %s", r.Method))
  254. }
  255. }
  256. // Additional initialization for Open packets
  257. func (r *Request) open(h Handlers, pkt requestPacket) responsePacket {
  258. flags := r.Pflags()
  259. id := pkt.id()
  260. switch {
  261. case flags.Write, flags.Append, flags.Creat, flags.Trunc:
  262. if flags.Read {
  263. if openFileWriter, ok := h.FilePut.(OpenFileWriter); ok {
  264. r.Method = "Open"
  265. rw, err := openFileWriter.OpenFile(r)
  266. if err != nil {
  267. return statusFromError(id, err)
  268. }
  269. r.setWriterAtReaderAt(rw)
  270. return &sshFxpHandlePacket{
  271. ID: id,
  272. Handle: r.handle,
  273. }
  274. }
  275. }
  276. r.Method = "Put"
  277. wr, err := h.FilePut.Filewrite(r)
  278. if err != nil {
  279. return statusFromError(id, err)
  280. }
  281. r.setWriterAt(wr)
  282. case flags.Read:
  283. r.Method = "Get"
  284. rd, err := h.FileGet.Fileread(r)
  285. if err != nil {
  286. return statusFromError(id, err)
  287. }
  288. r.setReaderAt(rd)
  289. default:
  290. return statusFromError(id, errors.New("bad file flags"))
  291. }
  292. return &sshFxpHandlePacket{
  293. ID: id,
  294. Handle: r.handle,
  295. }
  296. }
  297. func (r *Request) opendir(h Handlers, pkt requestPacket) responsePacket {
  298. r.Method = "List"
  299. la, err := h.FileList.Filelist(r)
  300. if err != nil {
  301. return statusFromError(pkt.id(), wrapPathError(r.Filepath, err))
  302. }
  303. r.setListerAt(la)
  304. return &sshFxpHandlePacket{
  305. ID: pkt.id(),
  306. Handle: r.handle,
  307. }
  308. }
  309. // wrap FileReader handler
  310. func fileget(h FileReader, r *Request, pkt requestPacket, alloc *allocator, orderID uint32) responsePacket {
  311. rd := r.getReaderAt()
  312. if rd == nil {
  313. return statusFromError(pkt.id(), errors.New("unexpected read packet"))
  314. }
  315. data, offset, _ := packetData(pkt, alloc, orderID)
  316. n, err := rd.ReadAt(data, offset)
  317. // only return EOF error if no data left to read
  318. if err != nil && (err != io.EOF || n == 0) {
  319. return statusFromError(pkt.id(), err)
  320. }
  321. return &sshFxpDataPacket{
  322. ID: pkt.id(),
  323. Length: uint32(n),
  324. Data: data[:n],
  325. }
  326. }
  327. // wrap FileWriter handler
  328. func fileput(h FileWriter, r *Request, pkt requestPacket, alloc *allocator, orderID uint32) responsePacket {
  329. wr := r.getWriterAt()
  330. if wr == nil {
  331. return statusFromError(pkt.id(), errors.New("unexpected write packet"))
  332. }
  333. data, offset, _ := packetData(pkt, alloc, orderID)
  334. _, err := wr.WriteAt(data, offset)
  335. return statusFromError(pkt.id(), err)
  336. }
  337. // wrap OpenFileWriter handler
  338. func fileputget(h FileWriter, r *Request, pkt requestPacket, alloc *allocator, orderID uint32) responsePacket {
  339. rw := r.getWriterAtReaderAt()
  340. if rw == nil {
  341. return statusFromError(pkt.id(), errors.New("unexpected write and read packet"))
  342. }
  343. switch p := pkt.(type) {
  344. case *sshFxpReadPacket:
  345. data, offset := p.getDataSlice(alloc, orderID), int64(p.Offset)
  346. n, err := rw.ReadAt(data, offset)
  347. // only return EOF error if no data left to read
  348. if err != nil && (err != io.EOF || n == 0) {
  349. return statusFromError(pkt.id(), err)
  350. }
  351. return &sshFxpDataPacket{
  352. ID: pkt.id(),
  353. Length: uint32(n),
  354. Data: data[:n],
  355. }
  356. case *sshFxpWritePacket:
  357. data, offset := p.Data, int64(p.Offset)
  358. _, err := rw.WriteAt(data, offset)
  359. return statusFromError(pkt.id(), err)
  360. default:
  361. return statusFromError(pkt.id(), errors.New("unexpected packet type for read or write"))
  362. }
  363. }
  364. // file data for additional read/write packets
  365. func packetData(p requestPacket, alloc *allocator, orderID uint32) (data []byte, offset int64, length uint32) {
  366. switch p := p.(type) {
  367. case *sshFxpReadPacket:
  368. return p.getDataSlice(alloc, orderID), int64(p.Offset), p.Len
  369. case *sshFxpWritePacket:
  370. return p.Data, int64(p.Offset), p.Length
  371. }
  372. return
  373. }
  374. // wrap FileCmder handler
  375. func filecmd(h FileCmder, r *Request, pkt requestPacket) responsePacket {
  376. switch p := pkt.(type) {
  377. case *sshFxpFsetstatPacket:
  378. r.Flags = p.Flags
  379. r.Attrs = p.Attrs.([]byte)
  380. }
  381. switch r.Method {
  382. case "PosixRename":
  383. if posixRenamer, ok := h.(PosixRenameFileCmder); ok {
  384. err := posixRenamer.PosixRename(r)
  385. return statusFromError(pkt.id(), err)
  386. }
  387. // PosixRenameFileCmder not implemented handle this request as a Rename
  388. r.Method = "Rename"
  389. err := h.Filecmd(r)
  390. return statusFromError(pkt.id(), err)
  391. case "StatVFS":
  392. if statVFSCmdr, ok := h.(StatVFSFileCmder); ok {
  393. stat, err := statVFSCmdr.StatVFS(r)
  394. if err != nil {
  395. return statusFromError(pkt.id(), err)
  396. }
  397. stat.ID = pkt.id()
  398. return stat
  399. }
  400. return statusFromError(pkt.id(), ErrSSHFxOpUnsupported)
  401. }
  402. err := h.Filecmd(r)
  403. return statusFromError(pkt.id(), err)
  404. }
  405. // wrap FileLister handler
  406. func filelist(h FileLister, r *Request, pkt requestPacket) responsePacket {
  407. lister := r.getListerAt()
  408. if lister == nil {
  409. return statusFromError(pkt.id(), errors.New("unexpected dir packet"))
  410. }
  411. offset := r.lsNext()
  412. finfo := make([]os.FileInfo, MaxFilelist)
  413. n, err := lister.ListAt(finfo, offset)
  414. r.lsInc(int64(n))
  415. // ignore EOF as we only return it when there are no results
  416. finfo = finfo[:n] // avoid need for nil tests below
  417. switch r.Method {
  418. case "List":
  419. if err != nil && (err != io.EOF || n == 0) {
  420. return statusFromError(pkt.id(), err)
  421. }
  422. nameAttrs := make([]*sshFxpNameAttr, 0, len(finfo))
  423. // If the type conversion fails, we get untyped `nil`,
  424. // which is handled by not looking up any names.
  425. idLookup, _ := h.(NameLookupFileLister)
  426. for _, fi := range finfo {
  427. nameAttrs = append(nameAttrs, &sshFxpNameAttr{
  428. Name: fi.Name(),
  429. LongName: runLs(idLookup, fi),
  430. Attrs: []interface{}{fi},
  431. })
  432. }
  433. return &sshFxpNamePacket{
  434. ID: pkt.id(),
  435. NameAttrs: nameAttrs,
  436. }
  437. default:
  438. err = fmt.Errorf("unexpected method: %s", r.Method)
  439. return statusFromError(pkt.id(), err)
  440. }
  441. }
  442. func filestat(h FileLister, r *Request, pkt requestPacket) responsePacket {
  443. var lister ListerAt
  444. var err error
  445. if r.Method == "Lstat" {
  446. if lstatFileLister, ok := h.(LstatFileLister); ok {
  447. lister, err = lstatFileLister.Lstat(r)
  448. } else {
  449. // LstatFileLister not implemented handle this request as a Stat
  450. r.Method = "Stat"
  451. lister, err = h.Filelist(r)
  452. }
  453. } else {
  454. lister, err = h.Filelist(r)
  455. }
  456. if err != nil {
  457. return statusFromError(pkt.id(), err)
  458. }
  459. finfo := make([]os.FileInfo, 1)
  460. n, err := lister.ListAt(finfo, 0)
  461. finfo = finfo[:n] // avoid need for nil tests below
  462. switch r.Method {
  463. case "Stat", "Lstat":
  464. if err != nil && err != io.EOF {
  465. return statusFromError(pkt.id(), err)
  466. }
  467. if n == 0 {
  468. err = &os.PathError{
  469. Op: strings.ToLower(r.Method),
  470. Path: r.Filepath,
  471. Err: syscall.ENOENT,
  472. }
  473. return statusFromError(pkt.id(), err)
  474. }
  475. return &sshFxpStatResponse{
  476. ID: pkt.id(),
  477. info: finfo[0],
  478. }
  479. case "Readlink":
  480. if err != nil && err != io.EOF {
  481. return statusFromError(pkt.id(), err)
  482. }
  483. if n == 0 {
  484. err = &os.PathError{
  485. Op: "readlink",
  486. Path: r.Filepath,
  487. Err: syscall.ENOENT,
  488. }
  489. return statusFromError(pkt.id(), err)
  490. }
  491. filename := finfo[0].Name()
  492. return &sshFxpNamePacket{
  493. ID: pkt.id(),
  494. NameAttrs: []*sshFxpNameAttr{
  495. {
  496. Name: filename,
  497. LongName: filename,
  498. Attrs: emptyFileStat,
  499. },
  500. },
  501. }
  502. default:
  503. err = fmt.Errorf("unexpected method: %s", r.Method)
  504. return statusFromError(pkt.id(), err)
  505. }
  506. }
  507. func readlink(readlinkFileLister ReadlinkFileLister, r *Request, pkt requestPacket) responsePacket {
  508. resolved, err := readlinkFileLister.Readlink(r.Filepath)
  509. if err != nil {
  510. return statusFromError(pkt.id(), err)
  511. }
  512. return &sshFxpNamePacket{
  513. ID: pkt.id(),
  514. NameAttrs: []*sshFxpNameAttr{
  515. {
  516. Name: resolved,
  517. LongName: resolved,
  518. Attrs: emptyFileStat,
  519. },
  520. },
  521. }
  522. }
  523. // init attributes of request object from packet data
  524. func requestMethod(p requestPacket) (method string) {
  525. switch p.(type) {
  526. case *sshFxpReadPacket, *sshFxpWritePacket, *sshFxpOpenPacket:
  527. // set in open() above
  528. case *sshFxpOpendirPacket, *sshFxpReaddirPacket:
  529. // set in opendir() above
  530. case *sshFxpSetstatPacket, *sshFxpFsetstatPacket:
  531. method = "Setstat"
  532. case *sshFxpRenamePacket:
  533. method = "Rename"
  534. case *sshFxpSymlinkPacket:
  535. method = "Symlink"
  536. case *sshFxpRemovePacket:
  537. method = "Remove"
  538. case *sshFxpStatPacket, *sshFxpFstatPacket:
  539. method = "Stat"
  540. case *sshFxpLstatPacket:
  541. method = "Lstat"
  542. case *sshFxpRmdirPacket:
  543. method = "Rmdir"
  544. case *sshFxpReadlinkPacket:
  545. method = "Readlink"
  546. case *sshFxpMkdirPacket:
  547. method = "Mkdir"
  548. case *sshFxpExtendedPacketHardlink:
  549. method = "Link"
  550. }
  551. return method
  552. }