| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030 |
- package sftp
- import (
- "bytes"
- "encoding/binary"
- "errors"
- "fmt"
- "io"
- "math"
- "os"
- "path"
- "sync"
- "sync/atomic"
- "syscall"
- "time"
- "github.com/kr/fs"
- "golang.org/x/crypto/ssh"
- )
- var (
- // ErrInternalInconsistency indicates the packets sent and the data queued to be
- // written to the file don't match up. It is an unusual error and usually is
- // caused by bad behavior server side or connection issues. The error is
- // limited in scope to the call where it happened, the client object is still
- // OK to use as long as the connection is still open.
- ErrInternalInconsistency = errors.New("internal inconsistency")
- // InternalInconsistency alias for ErrInternalInconsistency.
- //
- // Deprecated: please use ErrInternalInconsistency
- InternalInconsistency = ErrInternalInconsistency
- )
- // A ClientOption is a function which applies configuration to a Client.
- type ClientOption func(*Client) error
- // MaxPacketChecked sets the maximum size of the payload, measured in bytes.
- // This option only accepts sizes servers should support, ie. <= 32768 bytes.
- //
- // If you get the error "failed to send packet header: EOF" when copying a
- // large file, try lowering this number.
- //
- // The default packet size is 32768 bytes.
- func MaxPacketChecked(size int) ClientOption {
- return func(c *Client) error {
- if size < 1 {
- return errors.New("size must be greater or equal to 1")
- }
- if size > 32768 {
- return errors.New("sizes larger than 32KB might not work with all servers")
- }
- c.maxPacket = size
- return nil
- }
- }
- // MaxPacketUnchecked sets the maximum size of the payload, measured in bytes.
- // It accepts sizes larger than the 32768 bytes all servers should support.
- // Only use a setting higher than 32768 if your application always connects to
- // the same server or after sufficiently broad testing.
- //
- // If you get the error "failed to send packet header: EOF" when copying a
- // large file, try lowering this number.
- //
- // The default packet size is 32768 bytes.
- func MaxPacketUnchecked(size int) ClientOption {
- return func(c *Client) error {
- if size < 1 {
- return errors.New("size must be greater or equal to 1")
- }
- c.maxPacket = size
- return nil
- }
- }
- // MaxPacket sets the maximum size of the payload, measured in bytes.
- // This option only accepts sizes servers should support, ie. <= 32768 bytes.
- // This is a synonym for MaxPacketChecked that provides backward compatibility.
- //
- // If you get the error "failed to send packet header: EOF" when copying a
- // large file, try lowering this number.
- //
- // The default packet size is 32768 bytes.
- func MaxPacket(size int) ClientOption {
- return MaxPacketChecked(size)
- }
- // MaxConcurrentRequestsPerFile sets the maximum concurrent requests allowed for a single file.
- //
- // The default maximum concurrent requests is 64.
- func MaxConcurrentRequestsPerFile(n int) ClientOption {
- return func(c *Client) error {
- if n < 1 {
- return errors.New("n must be greater or equal to 1")
- }
- c.maxConcurrentRequests = n
- return nil
- }
- }
- // UseConcurrentWrites allows the Client to perform concurrent Writes.
- //
- // Using concurrency while doing writes, requires special consideration.
- // A write to a later offset in a file after an error,
- // could end up with a file length longer than what was successfully written.
- //
- // When using this option, if you receive an error during `io.Copy` or `io.WriteTo`,
- // you may need to `Truncate` the target Writer to avoid “holes” in the data written.
- func UseConcurrentWrites(value bool) ClientOption {
- return func(c *Client) error {
- c.useConcurrentWrites = value
- return nil
- }
- }
- // UseConcurrentReads allows the Client to perform concurrent Reads.
- //
- // Concurrent reads are generally safe to use and not using them will degrade
- // performance, so this option is enabled by default.
- //
- // When enabled, WriteTo will use Stat/Fstat to get the file size and determines
- // how many concurrent workers to use.
- // Some "read once" servers will delete the file if they receive a stat call on an
- // open file and then the download will fail.
- // Disabling concurrent reads you will be able to download files from these servers.
- // If concurrent reads are disabled, the UseFstat option is ignored.
- func UseConcurrentReads(value bool) ClientOption {
- return func(c *Client) error {
- c.disableConcurrentReads = !value
- return nil
- }
- }
- // UseFstat sets whether to use Fstat or Stat when File.WriteTo is called
- // (usually when copying files).
- // Some servers limit the amount of open files and calling Stat after opening
- // the file will throw an error From the server. Setting this flag will call
- // Fstat instead of Stat which is suppose to be called on an open file handle.
- //
- // It has been found that that with IBM Sterling SFTP servers which have
- // "extractability" level set to 1 which means only 1 file can be opened at
- // any given time.
- //
- // If the server you are working with still has an issue with both Stat and
- // Fstat calls you can always open a file and read it until the end.
- //
- // Another reason to read the file until its end and Fstat doesn't work is
- // that in some servers, reading a full file will automatically delete the
- // file as some of these mainframes map the file to a message in a queue.
- // Once the file has been read it will get deleted.
- func UseFstat(value bool) ClientOption {
- return func(c *Client) error {
- c.useFstat = value
- return nil
- }
- }
- // Client represents an SFTP session on a *ssh.ClientConn SSH connection.
- // Multiple Clients can be active on a single SSH connection, and a Client
- // may be called concurrently from multiple Goroutines.
- //
- // Client implements the github.com/kr/fs.FileSystem interface.
- type Client struct {
- clientConn
- ext map[string]string // Extensions (name -> data).
- maxPacket int // max packet size read or written.
- maxConcurrentRequests int
- nextid uint32
- // write concurrency is… error prone.
- // Default behavior should be to not use it.
- useConcurrentWrites bool
- useFstat bool
- disableConcurrentReads bool
- }
- // NewClient creates a new SFTP client on conn, using zero or more option
- // functions.
- func NewClient(conn *ssh.Client, opts ...ClientOption) (*Client, error) {
- s, err := conn.NewSession()
- if err != nil {
- return nil, err
- }
- if err := s.RequestSubsystem("sftp"); err != nil {
- return nil, err
- }
- pw, err := s.StdinPipe()
- if err != nil {
- return nil, err
- }
- pr, err := s.StdoutPipe()
- if err != nil {
- return nil, err
- }
- return NewClientPipe(pr, pw, opts...)
- }
- // NewClientPipe creates a new SFTP client given a Reader and a WriteCloser.
- // This can be used for connecting to an SFTP server over TCP/TLS or by using
- // the system's ssh client program (e.g. via exec.Command).
- func NewClientPipe(rd io.Reader, wr io.WriteCloser, opts ...ClientOption) (*Client, error) {
- sftp := &Client{
- clientConn: clientConn{
- conn: conn{
- Reader: rd,
- WriteCloser: wr,
- },
- inflight: make(map[uint32]chan<- result),
- closed: make(chan struct{}),
- },
- ext: make(map[string]string),
- maxPacket: 1 << 15,
- maxConcurrentRequests: 64,
- }
- for _, opt := range opts {
- if err := opt(sftp); err != nil {
- wr.Close()
- return nil, err
- }
- }
- if err := sftp.sendInit(); err != nil {
- wr.Close()
- return nil, fmt.Errorf("error sending init packet to server: %w", err)
- }
- if err := sftp.recvVersion(); err != nil {
- wr.Close()
- return nil, fmt.Errorf("error receiving version packet from server: %w", err)
- }
- sftp.clientConn.wg.Add(1)
- go func() {
- defer sftp.clientConn.wg.Done()
- if err := sftp.clientConn.recv(); err != nil {
- sftp.clientConn.broadcastErr(err)
- }
- }()
- return sftp, nil
- }
- // Create creates the named file mode 0666 (before umask), truncating it if it
- // already exists. If successful, methods on the returned File can be used for
- // I/O; the associated file descriptor has mode O_RDWR. If you need more
- // control over the flags/mode used to open the file see client.OpenFile.
- //
- // Note that some SFTP servers (eg. AWS Transfer) do not support opening files
- // read/write at the same time. For those services you will need to use
- // `client.OpenFile(os.O_WRONLY|os.O_CREATE|os.O_TRUNC)`.
- func (c *Client) Create(path string) (*File, error) {
- return c.open(path, flags(os.O_RDWR|os.O_CREATE|os.O_TRUNC))
- }
- const sftpProtocolVersion = 3 // https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt
- func (c *Client) sendInit() error {
- return c.clientConn.conn.sendPacket(&sshFxInitPacket{
- Version: sftpProtocolVersion, // https://filezilla-project.org/specs/draft-ietf-secsh-filexfer-02.txt
- })
- }
- // returns the next value of c.nextid
- func (c *Client) nextID() uint32 {
- return atomic.AddUint32(&c.nextid, 1)
- }
- func (c *Client) recvVersion() error {
- typ, data, err := c.recvPacket(0)
- if err != nil {
- if err == io.EOF {
- return fmt.Errorf("server unexpectedly closed connection: %w", io.ErrUnexpectedEOF)
- }
- return err
- }
- if typ != sshFxpVersion {
- return &unexpectedPacketErr{sshFxpVersion, typ}
- }
- version, data, err := unmarshalUint32Safe(data)
- if err != nil {
- return err
- }
- if version != sftpProtocolVersion {
- return &unexpectedVersionErr{sftpProtocolVersion, version}
- }
- for len(data) > 0 {
- var ext extensionPair
- ext, data, err = unmarshalExtensionPair(data)
- if err != nil {
- return err
- }
- c.ext[ext.Name] = ext.Data
- }
- return nil
- }
- // HasExtension checks whether the server supports a named extension.
- //
- // The first return value is the extension data reported by the server
- // (typically a version number).
- func (c *Client) HasExtension(name string) (string, bool) {
- data, ok := c.ext[name]
- return data, ok
- }
- // Walk returns a new Walker rooted at root.
- func (c *Client) Walk(root string) *fs.Walker {
- return fs.WalkFS(root, c)
- }
- // ReadDir reads the directory named by dirname and returns a list of
- // directory entries.
- func (c *Client) ReadDir(p string) ([]os.FileInfo, error) {
- handle, err := c.opendir(p)
- if err != nil {
- return nil, err
- }
- defer c.close(handle) // this has to defer earlier than the lock below
- var attrs []os.FileInfo
- var done = false
- for !done {
- id := c.nextID()
- typ, data, err1 := c.sendPacket(nil, &sshFxpReaddirPacket{
- ID: id,
- Handle: handle,
- })
- if err1 != nil {
- err = err1
- done = true
- break
- }
- switch typ {
- case sshFxpName:
- sid, data := unmarshalUint32(data)
- if sid != id {
- return nil, &unexpectedIDErr{id, sid}
- }
- count, data := unmarshalUint32(data)
- for i := uint32(0); i < count; i++ {
- var filename string
- filename, data = unmarshalString(data)
- _, data = unmarshalString(data) // discard longname
- var attr *FileStat
- attr, data = unmarshalAttrs(data)
- if filename == "." || filename == ".." {
- continue
- }
- attrs = append(attrs, fileInfoFromStat(attr, path.Base(filename)))
- }
- case sshFxpStatus:
- // TODO(dfc) scope warning!
- err = normaliseError(unmarshalStatus(id, data))
- done = true
- default:
- return nil, unimplementedPacketErr(typ)
- }
- }
- if err == io.EOF {
- err = nil
- }
- return attrs, err
- }
- func (c *Client) opendir(path string) (string, error) {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpOpendirPacket{
- ID: id,
- Path: path,
- })
- if err != nil {
- return "", err
- }
- switch typ {
- case sshFxpHandle:
- sid, data := unmarshalUint32(data)
- if sid != id {
- return "", &unexpectedIDErr{id, sid}
- }
- handle, _ := unmarshalString(data)
- return handle, nil
- case sshFxpStatus:
- return "", normaliseError(unmarshalStatus(id, data))
- default:
- return "", unimplementedPacketErr(typ)
- }
- }
- // Stat returns a FileInfo structure describing the file specified by path 'p'.
- // If 'p' is a symbolic link, the returned FileInfo structure describes the referent file.
- func (c *Client) Stat(p string) (os.FileInfo, error) {
- fs, err := c.stat(p)
- if err != nil {
- return nil, err
- }
- return fileInfoFromStat(fs, path.Base(p)), nil
- }
- // Lstat returns a FileInfo structure describing the file specified by path 'p'.
- // If 'p' is a symbolic link, the returned FileInfo structure describes the symbolic link.
- func (c *Client) Lstat(p string) (os.FileInfo, error) {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpLstatPacket{
- ID: id,
- Path: p,
- })
- if err != nil {
- return nil, err
- }
- switch typ {
- case sshFxpAttrs:
- sid, data := unmarshalUint32(data)
- if sid != id {
- return nil, &unexpectedIDErr{id, sid}
- }
- attr, _ := unmarshalAttrs(data)
- return fileInfoFromStat(attr, path.Base(p)), nil
- case sshFxpStatus:
- return nil, normaliseError(unmarshalStatus(id, data))
- default:
- return nil, unimplementedPacketErr(typ)
- }
- }
- // ReadLink reads the target of a symbolic link.
- func (c *Client) ReadLink(p string) (string, error) {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpReadlinkPacket{
- ID: id,
- Path: p,
- })
- if err != nil {
- return "", err
- }
- switch typ {
- case sshFxpName:
- sid, data := unmarshalUint32(data)
- if sid != id {
- return "", &unexpectedIDErr{id, sid}
- }
- count, data := unmarshalUint32(data)
- if count != 1 {
- return "", unexpectedCount(1, count)
- }
- filename, _ := unmarshalString(data) // ignore dummy attributes
- return filename, nil
- case sshFxpStatus:
- return "", normaliseError(unmarshalStatus(id, data))
- default:
- return "", unimplementedPacketErr(typ)
- }
- }
- // Link creates a hard link at 'newname', pointing at the same inode as 'oldname'
- func (c *Client) Link(oldname, newname string) error {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpHardlinkPacket{
- ID: id,
- Oldpath: oldname,
- Newpath: newname,
- })
- if err != nil {
- return err
- }
- switch typ {
- case sshFxpStatus:
- return normaliseError(unmarshalStatus(id, data))
- default:
- return unimplementedPacketErr(typ)
- }
- }
- // Symlink creates a symbolic link at 'newname', pointing at target 'oldname'
- func (c *Client) Symlink(oldname, newname string) error {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpSymlinkPacket{
- ID: id,
- Linkpath: newname,
- Targetpath: oldname,
- })
- if err != nil {
- return err
- }
- switch typ {
- case sshFxpStatus:
- return normaliseError(unmarshalStatus(id, data))
- default:
- return unimplementedPacketErr(typ)
- }
- }
- func (c *Client) setfstat(handle string, flags uint32, attrs interface{}) error {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpFsetstatPacket{
- ID: id,
- Handle: handle,
- Flags: flags,
- Attrs: attrs,
- })
- if err != nil {
- return err
- }
- switch typ {
- case sshFxpStatus:
- return normaliseError(unmarshalStatus(id, data))
- default:
- return unimplementedPacketErr(typ)
- }
- }
- // setstat is a convience wrapper to allow for changing of various parts of the file descriptor.
- func (c *Client) setstat(path string, flags uint32, attrs interface{}) error {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpSetstatPacket{
- ID: id,
- Path: path,
- Flags: flags,
- Attrs: attrs,
- })
- if err != nil {
- return err
- }
- switch typ {
- case sshFxpStatus:
- return normaliseError(unmarshalStatus(id, data))
- default:
- return unimplementedPacketErr(typ)
- }
- }
- // Chtimes changes the access and modification times of the named file.
- func (c *Client) Chtimes(path string, atime time.Time, mtime time.Time) error {
- type times struct {
- Atime uint32
- Mtime uint32
- }
- attrs := times{uint32(atime.Unix()), uint32(mtime.Unix())}
- return c.setstat(path, sshFileXferAttrACmodTime, attrs)
- }
- // Chown changes the user and group owners of the named file.
- func (c *Client) Chown(path string, uid, gid int) error {
- type owner struct {
- UID uint32
- GID uint32
- }
- attrs := owner{uint32(uid), uint32(gid)}
- return c.setstat(path, sshFileXferAttrUIDGID, attrs)
- }
- // Chmod changes the permissions of the named file.
- //
- // Chmod does not apply a umask, because even retrieving the umask is not
- // possible in a portable way without causing a race condition. Callers
- // should mask off umask bits, if desired.
- func (c *Client) Chmod(path string, mode os.FileMode) error {
- return c.setstat(path, sshFileXferAttrPermissions, toChmodPerm(mode))
- }
- // Truncate sets the size of the named file. Although it may be safely assumed
- // that if the size is less than its current size it will be truncated to fit,
- // the SFTP protocol does not specify what behavior the server should do when setting
- // size greater than the current size.
- func (c *Client) Truncate(path string, size int64) error {
- return c.setstat(path, sshFileXferAttrSize, uint64(size))
- }
- // Open opens the named file for reading. If successful, methods on the
- // returned file can be used for reading; the associated file descriptor
- // has mode O_RDONLY.
- func (c *Client) Open(path string) (*File, error) {
- return c.open(path, flags(os.O_RDONLY))
- }
- // OpenFile is the generalized open call; most users will use Open or
- // Create instead. It opens the named file with specified flag (O_RDONLY
- // etc.). If successful, methods on the returned File can be used for I/O.
- func (c *Client) OpenFile(path string, f int) (*File, error) {
- return c.open(path, flags(f))
- }
- func (c *Client) open(path string, pflags uint32) (*File, error) {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpOpenPacket{
- ID: id,
- Path: path,
- Pflags: pflags,
- })
- if err != nil {
- return nil, err
- }
- switch typ {
- case sshFxpHandle:
- sid, data := unmarshalUint32(data)
- if sid != id {
- return nil, &unexpectedIDErr{id, sid}
- }
- handle, _ := unmarshalString(data)
- return &File{c: c, path: path, handle: handle}, nil
- case sshFxpStatus:
- return nil, normaliseError(unmarshalStatus(id, data))
- default:
- return nil, unimplementedPacketErr(typ)
- }
- }
- // close closes a handle handle previously returned in the response
- // to SSH_FXP_OPEN or SSH_FXP_OPENDIR. The handle becomes invalid
- // immediately after this request has been sent.
- func (c *Client) close(handle string) error {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpClosePacket{
- ID: id,
- Handle: handle,
- })
- if err != nil {
- return err
- }
- switch typ {
- case sshFxpStatus:
- return normaliseError(unmarshalStatus(id, data))
- default:
- return unimplementedPacketErr(typ)
- }
- }
- func (c *Client) stat(path string) (*FileStat, error) {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpStatPacket{
- ID: id,
- Path: path,
- })
- if err != nil {
- return nil, err
- }
- switch typ {
- case sshFxpAttrs:
- sid, data := unmarshalUint32(data)
- if sid != id {
- return nil, &unexpectedIDErr{id, sid}
- }
- attr, _ := unmarshalAttrs(data)
- return attr, nil
- case sshFxpStatus:
- return nil, normaliseError(unmarshalStatus(id, data))
- default:
- return nil, unimplementedPacketErr(typ)
- }
- }
- func (c *Client) fstat(handle string) (*FileStat, error) {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpFstatPacket{
- ID: id,
- Handle: handle,
- })
- if err != nil {
- return nil, err
- }
- switch typ {
- case sshFxpAttrs:
- sid, data := unmarshalUint32(data)
- if sid != id {
- return nil, &unexpectedIDErr{id, sid}
- }
- attr, _ := unmarshalAttrs(data)
- return attr, nil
- case sshFxpStatus:
- return nil, normaliseError(unmarshalStatus(id, data))
- default:
- return nil, unimplementedPacketErr(typ)
- }
- }
- // StatVFS retrieves VFS statistics from a remote host.
- //
- // It implements the statvfs@openssh.com SSH_FXP_EXTENDED feature
- // from http://www.opensource.apple.com/source/OpenSSH/OpenSSH-175/openssh/PROTOCOL?txt.
- func (c *Client) StatVFS(path string) (*StatVFS, error) {
- // send the StatVFS packet to the server
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpStatvfsPacket{
- ID: id,
- Path: path,
- })
- if err != nil {
- return nil, err
- }
- switch typ {
- // server responded with valid data
- case sshFxpExtendedReply:
- var response StatVFS
- err = binary.Read(bytes.NewReader(data), binary.BigEndian, &response)
- if err != nil {
- return nil, errors.New("can not parse reply")
- }
- return &response, nil
- // the resquest failed
- case sshFxpStatus:
- return nil, normaliseError(unmarshalStatus(id, data))
- default:
- return nil, unimplementedPacketErr(typ)
- }
- }
- // Join joins any number of path elements into a single path, adding a
- // separating slash if necessary. The result is Cleaned; in particular, all
- // empty strings are ignored.
- func (c *Client) Join(elem ...string) string { return path.Join(elem...) }
- // Remove removes the specified file or directory. An error will be returned if no
- // file or directory with the specified path exists, or if the specified directory
- // is not empty.
- func (c *Client) Remove(path string) error {
- err := c.removeFile(path)
- // some servers, *cough* osx *cough*, return EPERM, not ENODIR.
- // serv-u returns ssh_FX_FILE_IS_A_DIRECTORY
- // EPERM is converted to os.ErrPermission so it is not a StatusError
- if err, ok := err.(*StatusError); ok {
- switch err.Code {
- case sshFxFailure, sshFxFileIsADirectory:
- return c.RemoveDirectory(path)
- }
- }
- if os.IsPermission(err) {
- return c.RemoveDirectory(path)
- }
- return err
- }
- func (c *Client) removeFile(path string) error {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpRemovePacket{
- ID: id,
- Filename: path,
- })
- if err != nil {
- return err
- }
- switch typ {
- case sshFxpStatus:
- return normaliseError(unmarshalStatus(id, data))
- default:
- return unimplementedPacketErr(typ)
- }
- }
- // RemoveDirectory removes a directory path.
- func (c *Client) RemoveDirectory(path string) error {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpRmdirPacket{
- ID: id,
- Path: path,
- })
- if err != nil {
- return err
- }
- switch typ {
- case sshFxpStatus:
- return normaliseError(unmarshalStatus(id, data))
- default:
- return unimplementedPacketErr(typ)
- }
- }
- // Rename renames a file.
- func (c *Client) Rename(oldname, newname string) error {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpRenamePacket{
- ID: id,
- Oldpath: oldname,
- Newpath: newname,
- })
- if err != nil {
- return err
- }
- switch typ {
- case sshFxpStatus:
- return normaliseError(unmarshalStatus(id, data))
- default:
- return unimplementedPacketErr(typ)
- }
- }
- // PosixRename renames a file using the posix-rename@openssh.com extension
- // which will replace newname if it already exists.
- func (c *Client) PosixRename(oldname, newname string) error {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpPosixRenamePacket{
- ID: id,
- Oldpath: oldname,
- Newpath: newname,
- })
- if err != nil {
- return err
- }
- switch typ {
- case sshFxpStatus:
- return normaliseError(unmarshalStatus(id, data))
- default:
- return unimplementedPacketErr(typ)
- }
- }
- // RealPath can be used to have the server canonicalize any given path name to an absolute path.
- //
- // This is useful for converting path names containing ".." components,
- // or relative pathnames without a leading slash into absolute paths.
- func (c *Client) RealPath(path string) (string, error) {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpRealpathPacket{
- ID: id,
- Path: path,
- })
- if err != nil {
- return "", err
- }
- switch typ {
- case sshFxpName:
- sid, data := unmarshalUint32(data)
- if sid != id {
- return "", &unexpectedIDErr{id, sid}
- }
- count, data := unmarshalUint32(data)
- if count != 1 {
- return "", unexpectedCount(1, count)
- }
- filename, _ := unmarshalString(data) // ignore attributes
- return filename, nil
- case sshFxpStatus:
- return "", normaliseError(unmarshalStatus(id, data))
- default:
- return "", unimplementedPacketErr(typ)
- }
- }
- // Getwd returns the current working directory of the server. Operations
- // involving relative paths will be based at this location.
- func (c *Client) Getwd() (string, error) {
- return c.RealPath(".")
- }
- // Mkdir creates the specified directory. An error will be returned if a file or
- // directory with the specified path already exists, or if the directory's
- // parent folder does not exist (the method cannot create complete paths).
- func (c *Client) Mkdir(path string) error {
- id := c.nextID()
- typ, data, err := c.sendPacket(nil, &sshFxpMkdirPacket{
- ID: id,
- Path: path,
- })
- if err != nil {
- return err
- }
- switch typ {
- case sshFxpStatus:
- return normaliseError(unmarshalStatus(id, data))
- default:
- return unimplementedPacketErr(typ)
- }
- }
- // MkdirAll creates a directory named path, along with any necessary parents,
- // and returns nil, or else returns an error.
- // If path is already a directory, MkdirAll does nothing and returns nil.
- // If path contains a regular file, an error is returned
- func (c *Client) MkdirAll(path string) error {
- // Most of this code mimics https://golang.org/src/os/path.go?s=514:561#L13
- // Fast path: if we can tell whether path is a directory or file, stop with success or error.
- dir, err := c.Stat(path)
- if err == nil {
- if dir.IsDir() {
- return nil
- }
- return &os.PathError{Op: "mkdir", Path: path, Err: syscall.ENOTDIR}
- }
- // Slow path: make sure parent exists and then call Mkdir for path.
- i := len(path)
- for i > 0 && path[i-1] == '/' { // Skip trailing path separator.
- i--
- }
- j := i
- for j > 0 && path[j-1] != '/' { // Scan backward over element.
- j--
- }
- if j > 1 {
- // Create parent
- err = c.MkdirAll(path[0 : j-1])
- if err != nil {
- return err
- }
- }
- // Parent now exists; invoke Mkdir and use its result.
- err = c.Mkdir(path)
- if err != nil {
- // Handle arguments like "foo/." by
- // double-checking that directory doesn't exist.
- dir, err1 := c.Lstat(path)
- if err1 == nil && dir.IsDir() {
- return nil
- }
- return err
- }
- return nil
- }
- // RemoveAll delete files recursively in the directory and Recursively delete subdirectories.
- // An error will be returned if no file or directory with the specified path exists
- func (c *Client) RemoveAll(path string) error {
- // Get the file/directory information
- fi, err := c.Stat(path)
- if err != nil {
- return err
- }
- if fi.IsDir() {
- // Delete files recursively in the directory
- files, err := c.ReadDir(path)
- if err != nil {
- return err
- }
- for _, file := range files {
- if file.IsDir() {
- // Recursively delete subdirectories
- err = c.RemoveAll(path + "/" + file.Name())
- if err != nil {
- return err
- }
- } else {
- // Delete individual files
- err = c.Remove(path + "/" + file.Name())
- if err != nil {
- return err
- }
- }
- }
- }
- return c.Remove(path)
- }
- // File represents a remote file.
- type File struct {
- c *Client
- path string
- handle string
- mu sync.Mutex
- offset int64 // current offset within remote file
- }
- // Close closes the File, rendering it unusable for I/O. It returns an
- // error, if any.
- func (f *File) Close() error {
- return f.c.close(f.handle)
- }
- // Name returns the name of the file as presented to Open or Create.
- func (f *File) Name() string {
- return f.path
- }
- // Read reads up to len(b) bytes from the File. It returns the number of bytes
- // read and an error, if any. Read follows io.Reader semantics, so when Read
- // encounters an error or EOF condition after successfully reading n > 0 bytes,
- // it returns the number of bytes read.
- //
- // To maximise throughput for transferring the entire file (especially
- // over high latency links) it is recommended to use WriteTo rather
- // than calling Read multiple times. io.Copy will do this
- // automatically.
- func (f *File) Read(b []byte) (int, error) {
- f.mu.Lock()
- defer f.mu.Unlock()
- n, err := f.ReadAt(b, f.offset)
- f.offset += int64(n)
- return n, err
- }
- // readChunkAt attempts to read the whole entire length of the buffer from the file starting at the offset.
- // It will continue progressively reading into the buffer until it fills the whole buffer, or an error occurs.
- func (f *File) readChunkAt(ch chan result, b []byte, off int64) (n int, err error) {
- for err == nil && n < len(b) {
- id := f.c.nextID()
- typ, data, err := f.c.sendPacket(ch, &sshFxpReadPacket{
- ID: id,
- Handle: f.handle,
- Offset: uint64(off) + uint64(n),
- Len: uint32(len(b) - n),
- })
- if err != nil {
- return n, err
- }
- switch typ {
- case sshFxpStatus:
- return n, normaliseError(unmarshalStatus(id, data))
- case sshFxpData:
- sid, data := unmarshalUint32(data)
- if id != sid {
- return n, &unexpectedIDErr{id, sid}
- }
- l, data := unmarshalUint32(data)
- n += copy(b[n:], data[:l])
- default:
- return n, unimplementedPacketErr(typ)
- }
- }
- return
- }
- func (f *File) readAtSequential(b []byte, off int64) (read int, err error) {
- for read < len(b) {
- rb := b[read:]
- if len(rb) > f.c.maxPacket {
- rb = rb[:f.c.maxPacket]
- }
- n, err := f.readChunkAt(nil, rb, off+int64(read))
- if n < 0 {
- panic("sftp.File: returned negative count from readChunkAt")
- }
- if n > 0 {
- read += n
- }
- if err != nil {
- return read, err
- }
- }
- return read, nil
- }
- // ReadAt reads up to len(b) byte from the File at a given offset `off`. It returns
- // the number of bytes read and an error, if any. ReadAt follows io.ReaderAt semantics,
- // so the file offset is not altered during the read.
- func (f *File) ReadAt(b []byte, off int64) (int, error) {
- if len(b) <= f.c.maxPacket {
- // This should be able to be serviced with 1/2 requests.
- // So, just do it directly.
- return f.readChunkAt(nil, b, off)
- }
- if f.c.disableConcurrentReads {
- return f.readAtSequential(b, off)
- }
- // Split the read into multiple maxPacket-sized concurrent reads bounded by maxConcurrentRequests.
- // This allows writes with a suitably large buffer to transfer data at a much faster rate
- // by overlapping round trip times.
- cancel := make(chan struct{})
- concurrency := len(b)/f.c.maxPacket + 1
- if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
- concurrency = f.c.maxConcurrentRequests
- }
- resPool := newResChanPool(concurrency)
- type work struct {
- id uint32
- res chan result
- b []byte
- off int64
- }
- workCh := make(chan work)
- // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
- go func() {
- defer close(workCh)
- b := b
- offset := off
- chunkSize := f.c.maxPacket
- for len(b) > 0 {
- rb := b
- if len(rb) > chunkSize {
- rb = rb[:chunkSize]
- }
- id := f.c.nextID()
- res := resPool.Get()
- f.c.dispatchRequest(res, &sshFxpReadPacket{
- ID: id,
- Handle: f.handle,
- Offset: uint64(offset),
- Len: uint32(chunkSize),
- })
- select {
- case workCh <- work{id, res, rb, offset}:
- case <-cancel:
- return
- }
- offset += int64(len(rb))
- b = b[len(rb):]
- }
- }()
- type rErr struct {
- off int64
- err error
- }
- errCh := make(chan rErr)
- var wg sync.WaitGroup
- wg.Add(concurrency)
- for i := 0; i < concurrency; i++ {
- // Map_i: each worker gets work, and then performs the Read into its buffer from its respective offset.
- go func() {
- defer wg.Done()
- for packet := range workCh {
- var n int
- s := <-packet.res
- resPool.Put(packet.res)
- err := s.err
- if err == nil {
- switch s.typ {
- case sshFxpStatus:
- err = normaliseError(unmarshalStatus(packet.id, s.data))
- case sshFxpData:
- sid, data := unmarshalUint32(s.data)
- if packet.id != sid {
- err = &unexpectedIDErr{packet.id, sid}
- } else {
- l, data := unmarshalUint32(data)
- n = copy(packet.b, data[:l])
- // For normal disk files, it is guaranteed that this will read
- // the specified number of bytes, or up to end of file.
- // This implies, if we have a short read, that means EOF.
- if n < len(packet.b) {
- err = io.EOF
- }
- }
- default:
- err = unimplementedPacketErr(s.typ)
- }
- }
- if err != nil {
- // return the offset as the start + how much we read before the error.
- errCh <- rErr{packet.off + int64(n), err}
- return
- }
- }
- }()
- }
- // Wait for long tail, before closing results.
- go func() {
- wg.Wait()
- close(errCh)
- }()
- // Reduce: collect all the results into a relevant return: the earliest offset to return an error.
- firstErr := rErr{math.MaxInt64, nil}
- for rErr := range errCh {
- if rErr.off <= firstErr.off {
- firstErr = rErr
- }
- select {
- case <-cancel:
- default:
- // stop any more work from being distributed. (Just in case.)
- close(cancel)
- }
- }
- if firstErr.err != nil {
- // firstErr.err != nil if and only if firstErr.off > our starting offset.
- return int(firstErr.off - off), firstErr.err
- }
- // As per spec for io.ReaderAt, we return nil error if and only if we read everything.
- return len(b), nil
- }
- // writeToSequential implements WriteTo, but works sequentially with no parallelism.
- func (f *File) writeToSequential(w io.Writer) (written int64, err error) {
- b := make([]byte, f.c.maxPacket)
- ch := make(chan result, 1) // reusable channel
- for {
- n, err := f.readChunkAt(ch, b, f.offset)
- if n < 0 {
- panic("sftp.File: returned negative count from readChunkAt")
- }
- if n > 0 {
- f.offset += int64(n)
- m, err := w.Write(b[:n])
- written += int64(m)
- if err != nil {
- return written, err
- }
- }
- if err != nil {
- if err == io.EOF {
- return written, nil // return nil explicitly.
- }
- return written, err
- }
- }
- }
- // WriteTo writes the file to the given Writer.
- // The return value is the number of bytes written.
- // Any error encountered during the write is also returned.
- //
- // This method is preferred over calling Read multiple times
- // to maximise throughput for transferring the entire file,
- // especially over high latency links.
- func (f *File) WriteTo(w io.Writer) (written int64, err error) {
- f.mu.Lock()
- defer f.mu.Unlock()
- if f.c.disableConcurrentReads {
- return f.writeToSequential(w)
- }
- // For concurrency, we want to guess how many concurrent workers we should use.
- var fileStat *FileStat
- if f.c.useFstat {
- fileStat, err = f.c.fstat(f.handle)
- } else {
- fileStat, err = f.c.stat(f.path)
- }
- if err != nil {
- return 0, err
- }
- fileSize := fileStat.Size
- if fileSize <= uint64(f.c.maxPacket) || !isRegular(fileStat.Mode) {
- // only regular files are guaranteed to return (full read) xor (partial read, next error)
- return f.writeToSequential(w)
- }
- concurrency64 := fileSize/uint64(f.c.maxPacket) + 1 // a bad guess, but better than no guess
- if concurrency64 > uint64(f.c.maxConcurrentRequests) || concurrency64 < 1 {
- concurrency64 = uint64(f.c.maxConcurrentRequests)
- }
- // Now that concurrency64 is saturated to an int value, we know this assignment cannot possibly overflow.
- concurrency := int(concurrency64)
- chunkSize := f.c.maxPacket
- pool := newBufPool(concurrency, chunkSize)
- resPool := newResChanPool(concurrency)
- cancel := make(chan struct{})
- var wg sync.WaitGroup
- defer func() {
- // Once the writing Reduce phase has ended, all the feed work needs to unconditionally stop.
- close(cancel)
- // We want to wait until all outstanding goroutines with an `f` or `f.c` reference have completed.
- // Just to be sure we don’t orphan any goroutines any hanging references.
- wg.Wait()
- }()
- type writeWork struct {
- b []byte
- off int64
- err error
- next chan writeWork
- }
- writeCh := make(chan writeWork)
- type readWork struct {
- id uint32
- res chan result
- off int64
- cur, next chan writeWork
- }
- readCh := make(chan readWork)
- // Slice: hand out chunks of work on demand, with a `cur` and `next` channel built-in for sequencing.
- go func() {
- defer close(readCh)
- off := f.offset
- cur := writeCh
- for {
- id := f.c.nextID()
- res := resPool.Get()
- next := make(chan writeWork)
- readWork := readWork{
- id: id,
- res: res,
- off: off,
- cur: cur,
- next: next,
- }
- f.c.dispatchRequest(res, &sshFxpReadPacket{
- ID: id,
- Handle: f.handle,
- Offset: uint64(off),
- Len: uint32(chunkSize),
- })
- select {
- case readCh <- readWork:
- case <-cancel:
- return
- }
- off += int64(chunkSize)
- cur = next
- }
- }()
- wg.Add(concurrency)
- for i := 0; i < concurrency; i++ {
- // Map_i: each worker gets readWork, and does the Read into a buffer at the given offset.
- go func() {
- defer wg.Done()
- for readWork := range readCh {
- var b []byte
- var n int
- s := <-readWork.res
- resPool.Put(readWork.res)
- err := s.err
- if err == nil {
- switch s.typ {
- case sshFxpStatus:
- err = normaliseError(unmarshalStatus(readWork.id, s.data))
- case sshFxpData:
- sid, data := unmarshalUint32(s.data)
- if readWork.id != sid {
- err = &unexpectedIDErr{readWork.id, sid}
- } else {
- l, data := unmarshalUint32(data)
- b = pool.Get()[:l]
- n = copy(b, data[:l])
- b = b[:n]
- }
- default:
- err = unimplementedPacketErr(s.typ)
- }
- }
- writeWork := writeWork{
- b: b,
- off: readWork.off,
- err: err,
- next: readWork.next,
- }
- select {
- case readWork.cur <- writeWork:
- case <-cancel:
- return
- }
- if err != nil {
- return
- }
- }
- }()
- }
- // Reduce: serialize the results from the reads into sequential writes.
- cur := writeCh
- for {
- packet, ok := <-cur
- if !ok {
- return written, errors.New("sftp.File.WriteTo: unexpectedly closed channel")
- }
- // Because writes are serialized, this will always be the last successfully read byte.
- f.offset = packet.off + int64(len(packet.b))
- if len(packet.b) > 0 {
- n, err := w.Write(packet.b)
- written += int64(n)
- if err != nil {
- return written, err
- }
- }
- if packet.err != nil {
- if packet.err == io.EOF {
- return written, nil
- }
- return written, packet.err
- }
- pool.Put(packet.b)
- cur = packet.next
- }
- }
- // Stat returns the FileInfo structure describing file. If there is an
- // error.
- func (f *File) Stat() (os.FileInfo, error) {
- fs, err := f.c.fstat(f.handle)
- if err != nil {
- return nil, err
- }
- return fileInfoFromStat(fs, path.Base(f.path)), nil
- }
- // Write writes len(b) bytes to the File. It returns the number of bytes
- // written and an error, if any. Write returns a non-nil error when n !=
- // len(b).
- //
- // To maximise throughput for transferring the entire file (especially
- // over high latency links) it is recommended to use ReadFrom rather
- // than calling Write multiple times. io.Copy will do this
- // automatically.
- func (f *File) Write(b []byte) (int, error) {
- f.mu.Lock()
- defer f.mu.Unlock()
- n, err := f.WriteAt(b, f.offset)
- f.offset += int64(n)
- return n, err
- }
- func (f *File) writeChunkAt(ch chan result, b []byte, off int64) (int, error) {
- typ, data, err := f.c.sendPacket(ch, &sshFxpWritePacket{
- ID: f.c.nextID(),
- Handle: f.handle,
- Offset: uint64(off),
- Length: uint32(len(b)),
- Data: b,
- })
- if err != nil {
- return 0, err
- }
- switch typ {
- case sshFxpStatus:
- id, _ := unmarshalUint32(data)
- err := normaliseError(unmarshalStatus(id, data))
- if err != nil {
- return 0, err
- }
- default:
- return 0, unimplementedPacketErr(typ)
- }
- return len(b), nil
- }
- // writeAtConcurrent implements WriterAt, but works concurrently rather than sequentially.
- func (f *File) writeAtConcurrent(b []byte, off int64) (int, error) {
- // Split the write into multiple maxPacket sized concurrent writes
- // bounded by maxConcurrentRequests. This allows writes with a suitably
- // large buffer to transfer data at a much faster rate due to
- // overlapping round trip times.
- cancel := make(chan struct{})
- type work struct {
- id uint32
- res chan result
- off int64
- }
- workCh := make(chan work)
- concurrency := len(b)/f.c.maxPacket + 1
- if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
- concurrency = f.c.maxConcurrentRequests
- }
- pool := newResChanPool(concurrency)
- // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
- go func() {
- defer close(workCh)
- var read int
- chunkSize := f.c.maxPacket
- for read < len(b) {
- wb := b[read:]
- if len(wb) > chunkSize {
- wb = wb[:chunkSize]
- }
- id := f.c.nextID()
- res := pool.Get()
- off := off + int64(read)
- f.c.dispatchRequest(res, &sshFxpWritePacket{
- ID: id,
- Handle: f.handle,
- Offset: uint64(off),
- Length: uint32(len(wb)),
- Data: wb,
- })
- select {
- case workCh <- work{id, res, off}:
- case <-cancel:
- return
- }
- read += len(wb)
- }
- }()
- type wErr struct {
- off int64
- err error
- }
- errCh := make(chan wErr)
- var wg sync.WaitGroup
- wg.Add(concurrency)
- for i := 0; i < concurrency; i++ {
- // Map_i: each worker gets work, and does the Write from each buffer to its respective offset.
- go func() {
- defer wg.Done()
- for work := range workCh {
- s := <-work.res
- pool.Put(work.res)
- err := s.err
- if err == nil {
- switch s.typ {
- case sshFxpStatus:
- err = normaliseError(unmarshalStatus(work.id, s.data))
- default:
- err = unimplementedPacketErr(s.typ)
- }
- }
- if err != nil {
- errCh <- wErr{work.off, err}
- }
- }
- }()
- }
- // Wait for long tail, before closing results.
- go func() {
- wg.Wait()
- close(errCh)
- }()
- // Reduce: collect all the results into a relevant return: the earliest offset to return an error.
- firstErr := wErr{math.MaxInt64, nil}
- for wErr := range errCh {
- if wErr.off <= firstErr.off {
- firstErr = wErr
- }
- select {
- case <-cancel:
- default:
- // stop any more work from being distributed. (Just in case.)
- close(cancel)
- }
- }
- if firstErr.err != nil {
- // firstErr.err != nil if and only if firstErr.off >= our starting offset.
- return int(firstErr.off - off), firstErr.err
- }
- return len(b), nil
- }
- // WriteAt writes up to len(b) byte to the File at a given offset `off`. It returns
- // the number of bytes written and an error, if any. WriteAt follows io.WriterAt semantics,
- // so the file offset is not altered during the write.
- func (f *File) WriteAt(b []byte, off int64) (written int, err error) {
- if len(b) <= f.c.maxPacket {
- // We can do this in one write.
- return f.writeChunkAt(nil, b, off)
- }
- if f.c.useConcurrentWrites {
- return f.writeAtConcurrent(b, off)
- }
- ch := make(chan result, 1) // reusable channel
- chunkSize := f.c.maxPacket
- for written < len(b) {
- wb := b[written:]
- if len(wb) > chunkSize {
- wb = wb[:chunkSize]
- }
- n, err := f.writeChunkAt(ch, wb, off+int64(written))
- if n > 0 {
- written += n
- }
- if err != nil {
- return written, err
- }
- }
- return len(b), nil
- }
- // ReadFromWithConcurrency implements ReaderFrom,
- // but uses the given concurrency to issue multiple requests at the same time.
- //
- // Giving a concurrency of less than one will default to the Client’s max concurrency.
- //
- // Otherwise, the given concurrency will be capped by the Client's max concurrency.
- func (f *File) ReadFromWithConcurrency(r io.Reader, concurrency int) (read int64, err error) {
- // Split the write into multiple maxPacket sized concurrent writes.
- // This allows writes with a suitably large reader
- // to transfer data at a much faster rate due to overlapping round trip times.
- cancel := make(chan struct{})
- type work struct {
- id uint32
- res chan result
- off int64
- }
- workCh := make(chan work)
- type rwErr struct {
- off int64
- err error
- }
- errCh := make(chan rwErr)
- if concurrency > f.c.maxConcurrentRequests || concurrency < 1 {
- concurrency = f.c.maxConcurrentRequests
- }
- pool := newResChanPool(concurrency)
- // Slice: cut up the Read into any number of buffers of length <= f.c.maxPacket, and at appropriate offsets.
- go func() {
- defer close(workCh)
- b := make([]byte, f.c.maxPacket)
- off := f.offset
- for {
- n, err := r.Read(b)
- if n > 0 {
- read += int64(n)
- id := f.c.nextID()
- res := pool.Get()
- f.c.dispatchRequest(res, &sshFxpWritePacket{
- ID: id,
- Handle: f.handle,
- Offset: uint64(off),
- Length: uint32(n),
- Data: b[:n],
- })
- select {
- case workCh <- work{id, res, off}:
- case <-cancel:
- return
- }
- off += int64(n)
- }
- if err != nil {
- if err != io.EOF {
- errCh <- rwErr{off, err}
- }
- return
- }
- }
- }()
- var wg sync.WaitGroup
- wg.Add(concurrency)
- for i := 0; i < concurrency; i++ {
- // Map_i: each worker gets work, and does the Write from each buffer to its respective offset.
- go func() {
- defer wg.Done()
- for work := range workCh {
- s := <-work.res
- pool.Put(work.res)
- err := s.err
- if err == nil {
- switch s.typ {
- case sshFxpStatus:
- err = normaliseError(unmarshalStatus(work.id, s.data))
- default:
- err = unimplementedPacketErr(s.typ)
- }
- }
- if err != nil {
- errCh <- rwErr{work.off, err}
- }
- }
- }()
- }
- // Wait for long tail, before closing results.
- go func() {
- wg.Wait()
- close(errCh)
- }()
- // Reduce: Collect all the results into a relevant return: the earliest offset to return an error.
- firstErr := rwErr{math.MaxInt64, nil}
- for rwErr := range errCh {
- if rwErr.off <= firstErr.off {
- firstErr = rwErr
- }
- select {
- case <-cancel:
- default:
- // stop any more work from being distributed.
- close(cancel)
- }
- }
- if firstErr.err != nil {
- // firstErr.err != nil if and only if firstErr.off is a valid offset.
- //
- // firstErr.off will then be the lesser of:
- // * the offset of the first error from writing,
- // * the last successfully read offset.
- //
- // This could be less than the last successfully written offset,
- // which is the whole reason for the UseConcurrentWrites() ClientOption.
- //
- // Callers are responsible for truncating any SFTP files to a safe length.
- f.offset = firstErr.off
- // ReadFrom is defined to return the read bytes, regardless of any writer errors.
- return read, firstErr.err
- }
- f.offset += read
- return read, nil
- }
- // ReadFrom reads data from r until EOF and writes it to the file. The return
- // value is the number of bytes read. Any error except io.EOF encountered
- // during the read is also returned.
- //
- // This method is preferred over calling Write multiple times
- // to maximise throughput for transferring the entire file,
- // especially over high-latency links.
- func (f *File) ReadFrom(r io.Reader) (int64, error) {
- f.mu.Lock()
- defer f.mu.Unlock()
- if f.c.useConcurrentWrites {
- var remain int64
- switch r := r.(type) {
- case interface{ Len() int }:
- remain = int64(r.Len())
- case interface{ Size() int64 }:
- remain = r.Size()
- case *io.LimitedReader:
- remain = r.N
- case interface{ Stat() (os.FileInfo, error) }:
- info, err := r.Stat()
- if err == nil {
- remain = info.Size()
- }
- }
- if remain < 0 {
- // We can strongly assert that we want default max concurrency here.
- return f.ReadFromWithConcurrency(r, f.c.maxConcurrentRequests)
- }
- if remain > int64(f.c.maxPacket) {
- // Otherwise, only use concurrency, if it would be at least two packets.
- // This is the best reasonable guess we can make.
- concurrency64 := remain/int64(f.c.maxPacket) + 1
- // We need to cap this value to an `int` size value to avoid overflow on 32-bit machines.
- // So, we may as well pre-cap it to `f.c.maxConcurrentRequests`.
- if concurrency64 > int64(f.c.maxConcurrentRequests) {
- concurrency64 = int64(f.c.maxConcurrentRequests)
- }
- return f.ReadFromWithConcurrency(r, int(concurrency64))
- }
- }
- ch := make(chan result, 1) // reusable channel
- b := make([]byte, f.c.maxPacket)
- var read int64
- for {
- n, err := r.Read(b)
- if n < 0 {
- panic("sftp.File: reader returned negative count from Read")
- }
- if n > 0 {
- read += int64(n)
- m, err2 := f.writeChunkAt(ch, b[:n], f.offset)
- f.offset += int64(m)
- if err == nil {
- err = err2
- }
- }
- if err != nil {
- if err == io.EOF {
- return read, nil // return nil explicitly.
- }
- return read, err
- }
- }
- }
- // Seek implements io.Seeker by setting the client offset for the next Read or
- // Write. It returns the next offset read. Seeking before or after the end of
- // the file is undefined. Seeking relative to the end calls Stat.
- func (f *File) Seek(offset int64, whence int) (int64, error) {
- f.mu.Lock()
- defer f.mu.Unlock()
- switch whence {
- case io.SeekStart:
- case io.SeekCurrent:
- offset += f.offset
- case io.SeekEnd:
- fi, err := f.Stat()
- if err != nil {
- return f.offset, err
- }
- offset += fi.Size()
- default:
- return f.offset, unimplementedSeekWhence(whence)
- }
- if offset < 0 {
- return f.offset, os.ErrInvalid
- }
- f.offset = offset
- return f.offset, nil
- }
- // Chown changes the uid/gid of the current file.
- func (f *File) Chown(uid, gid int) error {
- return f.c.Chown(f.path, uid, gid)
- }
- // Chmod changes the permissions of the current file.
- //
- // See Client.Chmod for details.
- func (f *File) Chmod(mode os.FileMode) error {
- return f.c.setfstat(f.handle, sshFileXferAttrPermissions, toChmodPerm(mode))
- }
- // Sync requests a flush of the contents of a File to stable storage.
- //
- // Sync requires the server to support the fsync@openssh.com extension.
- func (f *File) Sync() error {
- id := f.c.nextID()
- typ, data, err := f.c.sendPacket(nil, &sshFxpFsyncPacket{
- ID: id,
- Handle: f.handle,
- })
- switch {
- case err != nil:
- return err
- case typ == sshFxpStatus:
- return normaliseError(unmarshalStatus(id, data))
- default:
- return &unexpectedPacketErr{want: sshFxpStatus, got: typ}
- }
- }
- // Truncate sets the size of the current file. Although it may be safely assumed
- // that if the size is less than its current size it will be truncated to fit,
- // the SFTP protocol does not specify what behavior the server should do when setting
- // size greater than the current size.
- // We send a SSH_FXP_FSETSTAT here since we have a file handle
- func (f *File) Truncate(size int64) error {
- return f.c.setfstat(f.handle, sshFileXferAttrSize, uint64(size))
- }
- // normaliseError normalises an error into a more standard form that can be
- // checked against stdlib errors like io.EOF or os.ErrNotExist.
- func normaliseError(err error) error {
- switch err := err.(type) {
- case *StatusError:
- switch err.Code {
- case sshFxEOF:
- return io.EOF
- case sshFxNoSuchFile:
- return os.ErrNotExist
- case sshFxPermissionDenied:
- return os.ErrPermission
- case sshFxOk:
- return nil
- default:
- return err
- }
- default:
- return err
- }
- }
- // flags converts the flags passed to OpenFile into ssh flags.
- // Unsupported flags are ignored.
- func flags(f int) uint32 {
- var out uint32
- switch f & os.O_WRONLY {
- case os.O_WRONLY:
- out |= sshFxfWrite
- case os.O_RDONLY:
- out |= sshFxfRead
- }
- if f&os.O_RDWR == os.O_RDWR {
- out |= sshFxfRead | sshFxfWrite
- }
- if f&os.O_APPEND == os.O_APPEND {
- out |= sshFxfAppend
- }
- if f&os.O_CREATE == os.O_CREATE {
- out |= sshFxfCreat
- }
- if f&os.O_TRUNC == os.O_TRUNC {
- out |= sshFxfTrunc
- }
- if f&os.O_EXCL == os.O_EXCL {
- out |= sshFxfExcl
- }
- return out
- }
- // toChmodPerm converts Go permission bits to POSIX permission bits.
- //
- // This differs from fromFileMode in that we preserve the POSIX versions of
- // setuid, setgid and sticky in m, because we've historically supported those
- // bits, and we mask off any non-permission bits.
- func toChmodPerm(m os.FileMode) (perm uint32) {
- const mask = os.ModePerm | s_ISUID | s_ISGID | s_ISVTX
- perm = uint32(m & mask)
- if m&os.ModeSetuid != 0 {
- perm |= s_ISUID
- }
- if m&os.ModeSetgid != 0 {
- perm |= s_ISGID
- }
- if m&os.ModeSticky != 0 {
- perm |= s_ISVTX
- }
- return perm
- }
|