| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337 |
- package sftp
- import (
- "context"
- "errors"
- "io"
- "path"
- "path/filepath"
- "strconv"
- "sync"
- )
- var maxTxPacket uint32 = 1 << 15
- // Handlers contains the 4 SFTP server request handlers.
- type Handlers struct {
- FileGet FileReader
- FilePut FileWriter
- FileCmd FileCmder
- FileList FileLister
- }
- // RequestServer abstracts the sftp protocol with an http request-like protocol
- type RequestServer struct {
- Handlers Handlers
- *serverConn
- pktMgr *packetManager
- startDirectory string
- mu sync.RWMutex
- handleCount int
- openRequests map[string]*Request
- }
- // A RequestServerOption is a function which applies configuration to a RequestServer.
- type RequestServerOption func(*RequestServer)
- // WithRSAllocator enable the allocator.
- // After processing a packet we keep in memory the allocated slices
- // and we reuse them for new packets.
- // The allocator is experimental
- func WithRSAllocator() RequestServerOption {
- return func(rs *RequestServer) {
- alloc := newAllocator()
- rs.pktMgr.alloc = alloc
- rs.conn.alloc = alloc
- }
- }
- // WithStartDirectory sets a start directory to use as base for relative paths.
- // If unset the default is "/"
- func WithStartDirectory(startDirectory string) RequestServerOption {
- return func(rs *RequestServer) {
- rs.startDirectory = cleanPath(startDirectory)
- }
- }
- // NewRequestServer creates/allocates/returns new RequestServer.
- // Normally there will be one server per user-session.
- func NewRequestServer(rwc io.ReadWriteCloser, h Handlers, options ...RequestServerOption) *RequestServer {
- svrConn := &serverConn{
- conn: conn{
- Reader: rwc,
- WriteCloser: rwc,
- },
- }
- rs := &RequestServer{
- Handlers: h,
- serverConn: svrConn,
- pktMgr: newPktMgr(svrConn),
- startDirectory: "/",
- openRequests: make(map[string]*Request),
- }
- for _, o := range options {
- o(rs)
- }
- return rs
- }
- // New Open packet/Request
- func (rs *RequestServer) nextRequest(r *Request) string {
- rs.mu.Lock()
- defer rs.mu.Unlock()
- rs.handleCount++
- r.handle = strconv.Itoa(rs.handleCount)
- rs.openRequests[r.handle] = r
- return r.handle
- }
- // Returns Request from openRequests, bool is false if it is missing.
- //
- // The Requests in openRequests work essentially as open file descriptors that
- // you can do different things with. What you are doing with it are denoted by
- // the first packet of that type (read/write/etc).
- func (rs *RequestServer) getRequest(handle string) (*Request, bool) {
- rs.mu.RLock()
- defer rs.mu.RUnlock()
- r, ok := rs.openRequests[handle]
- return r, ok
- }
- // Close the Request and clear from openRequests map
- func (rs *RequestServer) closeRequest(handle string) error {
- rs.mu.Lock()
- defer rs.mu.Unlock()
- if r, ok := rs.openRequests[handle]; ok {
- delete(rs.openRequests, handle)
- return r.close()
- }
- return EBADF
- }
- // Close the read/write/closer to trigger exiting the main server loop
- func (rs *RequestServer) Close() error { return rs.conn.Close() }
- func (rs *RequestServer) serveLoop(pktChan chan<- orderedRequest) error {
- defer close(pktChan) // shuts down sftpServerWorkers
- var err error
- var pkt requestPacket
- var pktType uint8
- var pktBytes []byte
- for {
- pktType, pktBytes, err = rs.serverConn.recvPacket(rs.pktMgr.getNextOrderID())
- if err != nil {
- // we don't care about releasing allocated pages here, the server will quit and the allocator freed
- return err
- }
- pkt, err = makePacket(rxPacket{fxp(pktType), pktBytes})
- if err != nil {
- switch {
- case errors.Is(err, errUnknownExtendedPacket):
- // do nothing
- default:
- debug("makePacket err: %v", err)
- rs.conn.Close() // shuts down recvPacket
- return err
- }
- }
- pktChan <- rs.pktMgr.newOrderedRequest(pkt)
- }
- }
- // Serve requests for user session
- func (rs *RequestServer) Serve() error {
- defer func() {
- if rs.pktMgr.alloc != nil {
- rs.pktMgr.alloc.Free()
- }
- }()
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- var wg sync.WaitGroup
- runWorker := func(ch chan orderedRequest) {
- wg.Add(1)
- go func() {
- defer wg.Done()
- if err := rs.packetWorker(ctx, ch); err != nil {
- rs.conn.Close() // shuts down recvPacket
- }
- }()
- }
- pktChan := rs.pktMgr.workerChan(runWorker)
- err := rs.serveLoop(pktChan)
- wg.Wait() // wait for all workers to exit
- rs.mu.Lock()
- defer rs.mu.Unlock()
- // make sure all open requests are properly closed
- // (eg. possible on dropped connections, client crashes, etc.)
- for handle, req := range rs.openRequests {
- if err == io.EOF {
- err = io.ErrUnexpectedEOF
- }
- req.transferError(err)
- delete(rs.openRequests, handle)
- req.close()
- }
- return err
- }
- func (rs *RequestServer) packetWorker(ctx context.Context, pktChan chan orderedRequest) error {
- for pkt := range pktChan {
- orderID := pkt.orderID()
- if epkt, ok := pkt.requestPacket.(*sshFxpExtendedPacket); ok {
- if epkt.SpecificPacket != nil {
- pkt.requestPacket = epkt.SpecificPacket
- }
- }
- var rpkt responsePacket
- switch pkt := pkt.requestPacket.(type) {
- case *sshFxInitPacket:
- rpkt = &sshFxVersionPacket{Version: sftpProtocolVersion, Extensions: sftpExtensions}
- case *sshFxpClosePacket:
- handle := pkt.getHandle()
- rpkt = statusFromError(pkt.ID, rs.closeRequest(handle))
- case *sshFxpRealpathPacket:
- var realPath string
- var err error
- switch pather := rs.Handlers.FileList.(type) {
- case RealPathFileLister:
- realPath, err = pather.RealPath(pkt.getPath())
- case legacyRealPathFileLister:
- realPath = pather.RealPath(pkt.getPath())
- default:
- realPath = cleanPathWithBase(rs.startDirectory, pkt.getPath())
- }
- if err != nil {
- rpkt = statusFromError(pkt.ID, err)
- } else {
- rpkt = cleanPacketPath(pkt, realPath)
- }
- case *sshFxpOpendirPacket:
- request := requestFromPacket(ctx, pkt, rs.startDirectory)
- handle := rs.nextRequest(request)
- rpkt = request.opendir(rs.Handlers, pkt)
- if _, ok := rpkt.(*sshFxpHandlePacket); !ok {
- // if we return an error we have to remove the handle from the active ones
- rs.closeRequest(handle)
- }
- case *sshFxpOpenPacket:
- request := requestFromPacket(ctx, pkt, rs.startDirectory)
- handle := rs.nextRequest(request)
- rpkt = request.open(rs.Handlers, pkt)
- if _, ok := rpkt.(*sshFxpHandlePacket); !ok {
- // if we return an error we have to remove the handle from the active ones
- rs.closeRequest(handle)
- }
- case *sshFxpFstatPacket:
- handle := pkt.getHandle()
- request, ok := rs.getRequest(handle)
- if !ok {
- rpkt = statusFromError(pkt.ID, EBADF)
- } else {
- request = &Request{
- Method: "Stat",
- Filepath: cleanPathWithBase(rs.startDirectory, request.Filepath),
- }
- rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
- }
- case *sshFxpFsetstatPacket:
- handle := pkt.getHandle()
- request, ok := rs.getRequest(handle)
- if !ok {
- rpkt = statusFromError(pkt.ID, EBADF)
- } else {
- request = &Request{
- Method: "Setstat",
- Filepath: cleanPathWithBase(rs.startDirectory, request.Filepath),
- }
- rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
- }
- case *sshFxpExtendedPacketPosixRename:
- request := &Request{
- Method: "PosixRename",
- Filepath: cleanPathWithBase(rs.startDirectory, pkt.Oldpath),
- Target: cleanPathWithBase(rs.startDirectory, pkt.Newpath),
- }
- rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
- case *sshFxpExtendedPacketStatVFS:
- request := &Request{
- Method: "StatVFS",
- Filepath: cleanPathWithBase(rs.startDirectory, pkt.Path),
- }
- rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
- case hasHandle:
- handle := pkt.getHandle()
- request, ok := rs.getRequest(handle)
- if !ok {
- rpkt = statusFromError(pkt.id(), EBADF)
- } else {
- rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
- }
- case hasPath:
- request := requestFromPacket(ctx, pkt, rs.startDirectory)
- rpkt = request.call(rs.Handlers, pkt, rs.pktMgr.alloc, orderID)
- request.close()
- default:
- rpkt = statusFromError(pkt.id(), ErrSSHFxOpUnsupported)
- }
- rs.pktMgr.readyPacket(
- rs.pktMgr.newOrderedResponse(rpkt, orderID))
- }
- return nil
- }
- // clean and return name packet for file
- func cleanPacketPath(pkt *sshFxpRealpathPacket, realPath string) responsePacket {
- return &sshFxpNamePacket{
- ID: pkt.id(),
- NameAttrs: []*sshFxpNameAttr{
- {
- Name: realPath,
- LongName: realPath,
- Attrs: emptyFileStat,
- },
- },
- }
- }
- // Makes sure we have a clean POSIX (/) absolute path to work with
- func cleanPath(p string) string {
- return cleanPathWithBase("/", p)
- }
- func cleanPathWithBase(base, p string) string {
- p = filepath.ToSlash(filepath.Clean(p))
- if !path.IsAbs(p) {
- return path.Join(base, p)
- }
- return p
- }
|