| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444 |
- package cos
- import (
- "bytes"
- "context"
- "encoding/binary"
- "encoding/xml"
- "fmt"
- "hash/crc32"
- "io"
- "io/ioutil"
- "net/http"
- "os"
- "time"
- )
- type JSONInputSerialization struct {
- Type string `xml:"Type,omitempty"`
- }
- type CSVInputSerialization struct {
- RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
- FieldDelimiter string `xml:"FieldDelimiter,omitempty"`
- QuoteCharacter string `xml:"QuoteCharacter,omitempty"`
- QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"`
- AllowQuotedRecordDelimiter string `xml:"AllowQuotedRecordDelimiter,omitempty"`
- FileHeaderInfo string `xml:"FileHeaderInfo,omitempty"`
- Comments string `xml:"Comments,omitempty"`
- }
- type SelectInputSerialization struct {
- CompressionType string `xml:"CompressionType,omitempty"`
- CSV *CSVInputSerialization `xml:"CSV,omitempty"`
- JSON *JSONInputSerialization `xml:"JSON,omitempty"`
- }
- type JSONOutputSerialization struct {
- RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
- }
- type CSVOutputSerialization struct {
- QuoteFields string `xml:"QuoteFields,omitempty"`
- RecordDelimiter string `xml:"RecordDelimiter,omitempty"`
- FieldDelimiter string `xml:"FieldDelimiter,omitempty"`
- QuoteCharacter string `xml:"QuoteCharacter,omitempty"`
- QuoteEscapeCharacter string `xml:"QuoteEscapeCharacter,omitempty"`
- }
- type SelectOutputSerialization struct {
- CSV *CSVOutputSerialization `xml:"CSV,omitempty"`
- JSON *JSONOutputSerialization `xml:"JSON,omitempty"`
- }
- type ObjectSelectOptions struct {
- XMLName xml.Name `xml:"SelectRequest"`
- Expression string `xml:"Expression"`
- ExpressionType string `xml:"ExpressionType"`
- InputSerialization *SelectInputSerialization `xml:"InputSerialization"`
- OutputSerialization *SelectOutputSerialization `xml:"OutputSerialization"`
- RequestProgress string `xml:"RequestProgress>Enabled,omitempty"`
- }
- func (s *ObjectService) Select(ctx context.Context, name string, opt *ObjectSelectOptions) (io.ReadCloser, error) {
- u := fmt.Sprintf("/%s?select&select-type=2", encodeURIComponent(name))
- sendOpt := sendOptions{
- baseURL: s.client.BaseURL.BucketURL,
- uri: u,
- method: http.MethodPost,
- body: opt,
- disableCloseBody: true,
- }
- resp, err := s.client.send(ctx, &sendOpt)
- if err != nil {
- return nil, err
- }
- result := &ObjectSelectResponse{
- Headers: resp.Header,
- Body: resp.Body,
- StatusCode: resp.StatusCode,
- Frame: &ObjectSelectResult{
- NextFrame: true,
- Payload: []byte{},
- },
- Finish: false,
- }
- return result, nil
- }
- func (s *ObjectService) SelectToFile(ctx context.Context, name, file string, opt *ObjectSelectOptions) (*ObjectSelectResponse, error) {
- resp, err := s.Select(ctx, name, opt)
- if err != nil {
- return nil, err
- }
- res, _ := resp.(*ObjectSelectResponse)
- defer func() {
- io.Copy(ioutil.Discard, resp)
- resp.Close()
- }()
- fd, err := os.OpenFile(file, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, os.FileMode(0664))
- if err != nil {
- return res, err
- }
- _, err = io.Copy(fd, resp)
- fd.Close()
- res.Finish = true
- return res, err
- }
- const (
- kReadTimeout = 3
- kMessageType = ":message-type"
- kEventType = ":event-type"
- kContentType = ":content-type"
- kRecordsFrameType = iota
- kContinuationFrameType
- kProgressFrameType
- kStatsFrameType
- kEndFrameType
- kErrorFrameType
- )
- type ProgressFrame struct {
- XMLName xml.Name `xml:"Progress"`
- BytesScanned int `xml:"BytesScanned"`
- BytesProcessed int `xml:"BytesProcessed"`
- BytesReturned int `xml:"BytesReturned"`
- }
- type StatsFrame struct {
- XMLName xml.Name `xml:"Stats"`
- BytesScanned int `xml:"BytesScanned"`
- BytesProcessed int `xml:"BytesProcessed"`
- BytesReturned int `xml:"BytesReturned"`
- }
- type DataFrame struct {
- ContentType string
- ConsumedBytesLength int32
- LeftBytesLength int32
- }
- type ErrorFrame struct {
- Code string
- Message string
- }
- func (e *ErrorFrame) Error() string {
- return fmt.Sprintf("Error Code: %s, Error Message: %s", e.Code, e.Message)
- }
- type ObjectSelectResult struct {
- TotalFrameLength int32
- TotalHeaderLength int32
- NextFrame bool
- FrameType int
- Payload []byte
- DataFrame DataFrame
- ProgressFrame ProgressFrame
- StatsFrame StatsFrame
- ErrorFrame *ErrorFrame
- }
- type ObjectSelectResponse struct {
- StatusCode int
- Headers http.Header
- Body io.ReadCloser
- Frame *ObjectSelectResult
- Finish bool
- }
- func (osr *ObjectSelectResponse) Read(p []byte) (n int, err error) {
- n, err = osr.readFrames(p)
- return
- }
- func (osr *ObjectSelectResponse) Close() error {
- return osr.Body.Close()
- }
- func (osr *ObjectSelectResponse) readFrames(p []byte) (int, error) {
- if osr.Finish {
- return 0, io.EOF
- }
- if osr.Frame.ErrorFrame != nil {
- return 0, osr.Frame.ErrorFrame
- }
- var err error
- var nlen int
- dlen := len(p)
- for nlen < dlen {
- if osr.Frame.NextFrame == true {
- osr.Frame.NextFrame = false
- err := osr.analysisPrelude()
- if err != nil {
- return nlen, err
- }
- err = osr.analysisHeader()
- if err != nil {
- return nlen, err
- }
- }
- switch osr.Frame.FrameType {
- case kRecordsFrameType:
- n, err := osr.analysisRecords(p[nlen:])
- if err != nil {
- return nlen, err
- }
- nlen += n
- case kContinuationFrameType:
- err = osr.payloadChecksum("ContinuationFrame")
- if err != nil {
- return nlen, err
- }
- case kProgressFrameType:
- err := osr.analysisXml(&osr.Frame.ProgressFrame)
- if err != nil {
- return nlen, err
- }
- case kStatsFrameType:
- err := osr.analysisXml(&osr.Frame.StatsFrame)
- if err != nil {
- return nlen, err
- }
- case kEndFrameType:
- err = osr.payloadChecksum("EndFrame")
- if err != nil {
- return nlen, err
- }
- osr.Finish = true
- return nlen, io.EOF
- case kErrorFrameType:
- return nlen, osr.Frame.ErrorFrame
- }
- }
- return nlen, err
- }
- func (osr *ObjectSelectResponse) analysisPrelude() error {
- frame := make([]byte, 12)
- _, err := osr.fixedLengthRead(frame, kReadTimeout)
- if err != nil {
- return err
- }
- var preludeCRC uint32
- bytesToInt(frame[0:4], &osr.Frame.TotalFrameLength)
- bytesToInt(frame[4:8], &osr.Frame.TotalHeaderLength)
- bytesToInt(frame[8:12], &preludeCRC)
- osr.Frame.Payload = append(osr.Frame.Payload, frame...)
- return checksum(frame[0:8], preludeCRC, "Prelude")
- }
- func (osr *ObjectSelectResponse) analysisHeader() error {
- var nlen int32
- headers := make(map[string]string)
- for nlen < osr.Frame.TotalHeaderLength {
- var headerNameLen int8
- var headerValueLen int16
- bHeaderNameLen := make([]byte, 1)
- _, err := osr.fixedLengthRead(bHeaderNameLen, kReadTimeout)
- if err != nil {
- return err
- }
- nlen += 1
- bytesToInt(bHeaderNameLen, &headerNameLen)
- osr.Frame.Payload = append(osr.Frame.Payload, bHeaderNameLen...)
- bHeaderName := make([]byte, headerNameLen)
- _, err = osr.fixedLengthRead(bHeaderName, kReadTimeout)
- if err != nil {
- return err
- }
- nlen += int32(headerNameLen)
- headerName := string(bHeaderName)
- osr.Frame.Payload = append(osr.Frame.Payload, bHeaderName...)
- bValueTypeLen := make([]byte, 3)
- _, err = osr.fixedLengthRead(bValueTypeLen, kReadTimeout)
- if err != nil {
- return err
- }
- nlen += 3
- bytesToInt(bValueTypeLen[1:], &headerValueLen)
- osr.Frame.Payload = append(osr.Frame.Payload, bValueTypeLen...)
- bHeaderValue := make([]byte, headerValueLen)
- _, err = osr.fixedLengthRead(bHeaderValue, kReadTimeout)
- if err != nil {
- return err
- }
- nlen += int32(headerValueLen)
- headers[headerName] = string(bHeaderValue)
- osr.Frame.Payload = append(osr.Frame.Payload, bHeaderValue...)
- }
- htype, ok := headers[kMessageType]
- if !ok {
- return fmt.Errorf("header parse failed, no message-type, headers: %+v\n", headers)
- }
- switch {
- case htype == "error":
- osr.Frame.FrameType = kErrorFrameType
- osr.Frame.ErrorFrame = &ErrorFrame{}
- osr.Frame.ErrorFrame.Code, _ = headers[":error-code"]
- osr.Frame.ErrorFrame.Message, _ = headers[":error-message"]
- case htype == "event":
- hevent, ok := headers[kEventType]
- if !ok {
- return fmt.Errorf("header parse failed, no event-type, headers: %+v\n", headers)
- }
- switch {
- case hevent == "Records":
- hContentType, ok := headers[kContentType]
- if ok {
- osr.Frame.DataFrame.ContentType = hContentType
- }
- osr.Frame.FrameType = kRecordsFrameType
- case hevent == "Cont":
- osr.Frame.FrameType = kContinuationFrameType
- case hevent == "Progress":
- osr.Frame.FrameType = kProgressFrameType
- case hevent == "Stats":
- osr.Frame.FrameType = kStatsFrameType
- case hevent == "End":
- osr.Frame.FrameType = kEndFrameType
- default:
- return fmt.Errorf("header parse failed, invalid event-type, headers: %+v\n", headers)
- }
- default:
- return fmt.Errorf("header parse failed, invalid message-type: headers: %+v\n", headers)
- }
- return nil
- }
- func (osr *ObjectSelectResponse) analysisRecords(data []byte) (int, error) {
- var needReadLength int32
- dlen := int32(len(data))
- restLen := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength - osr.Frame.DataFrame.ConsumedBytesLength
- if dlen <= restLen {
- needReadLength = dlen
- } else {
- needReadLength = restLen
- }
- n, err := osr.fixedLengthRead(data[:needReadLength], kReadTimeout)
- if err != nil {
- return n, fmt.Errorf("read data frame error: %s", err.Error())
- }
- osr.Frame.DataFrame.ConsumedBytesLength += int32(n)
- osr.Frame.Payload = append(osr.Frame.Payload, data[:needReadLength]...)
- // 读完了一帧数据并填充到data中了
- if osr.Frame.DataFrame.ConsumedBytesLength == osr.Frame.TotalFrameLength-16-osr.Frame.TotalHeaderLength {
- osr.Frame.DataFrame.ConsumedBytesLength = 0
- err = osr.payloadChecksum("RecordFrame")
- }
- return n, err
- }
- func (osr *ObjectSelectResponse) analysisXml(frame interface{}) error {
- payloadLength := osr.Frame.TotalFrameLength - 16 - osr.Frame.TotalHeaderLength
- bFrame := make([]byte, payloadLength)
- _, err := osr.fixedLengthRead(bFrame, kReadTimeout)
- if err != nil {
- return err
- }
- err = xml.Unmarshal(bFrame, frame)
- if err != nil {
- return err
- }
- osr.Frame.Payload = append(osr.Frame.Payload, bFrame...)
- return osr.payloadChecksum("XmlFrame")
- }
- // 调用payloadChecksum时,表示该帧已读完,开始读取下一帧内容
- func (osr *ObjectSelectResponse) payloadChecksum(ftype string) error {
- bcrc := make([]byte, 4)
- _, err := osr.fixedLengthRead(bcrc, kReadTimeout)
- if err != nil {
- return err
- }
- var res uint32
- bytesToInt(bcrc, &res)
- err = checksum(osr.Frame.Payload, res, ftype)
- osr.Frame.NextFrame = true
- osr.Frame.Payload = []byte{}
- return err
- }
- type chanReadIO struct {
- readLen int
- err error
- }
- func (osr *ObjectSelectResponse) fixedLengthRead(p []byte, read_timeout int64) (int, error) {
- timeout := time.Duration(read_timeout)
- r := osr.Body
- ch := make(chan chanReadIO, 1)
- defer close(ch)
- go func(p []byte) {
- var needLen int
- readChan := chanReadIO{}
- needLen = len(p)
- for {
- n, err := r.Read(p[readChan.readLen:needLen])
- readChan.readLen += n
- if err != nil {
- readChan.err = err
- ch <- readChan
- return
- }
- if readChan.readLen == needLen {
- break
- }
- }
- ch <- readChan
- }(p)
- select {
- case <-time.After(time.Second * timeout):
- return 0, fmt.Errorf("requestId: %s, readLen timeout, timeout is %d(second),need read:%d", "sr.Headers.Get(HTTPHeaderOssRequestID)", timeout, len(p))
- case result := <-ch:
- return result.readLen, result.err
- }
- }
- func bytesToInt(b []byte, ret interface{}) {
- binBuf := bytes.NewBuffer(b)
- binary.Read(binBuf, binary.BigEndian, ret)
- }
- func checksum(b []byte, rec uint32, ftype string) error {
- c := crc32.ChecksumIEEE(b)
- if c != rec {
- return fmt.Errorf("parse type: %v, checksum failed, cal: %v, rec: %v\n", ftype, c, rec)
- }
- return nil
- }
|