为了账号安全,请及时绑定邮箱和手机立即绑定

Golang - gzipping mongodb find 查询的游标数据,写入文件并解压缩时出错

Golang - gzipping mongodb find 查询的游标数据,写入文件并解压缩时出错

Go
开心每一天1111 2022-06-01 12:24:39
我正在迭代一个 mongodb 游标并将数据压缩并发送到 S3 对象。尝试使用 解压缩上传的文件gzip -d时,出现以下错误,gzip: 9.log.gz: invalid compressed data--crc errorgzip: 9.log.gz: invalid compressed data--length error下面给出了我用于迭代、压缩、上传的代码,// CursorReader struct acts as reader wrapper on top of mongodb cursortype CursorReader struct {    Csr *mongo.Cursor}// Read func reads the data from cursor and puts it into byte arrayfunc (cr *CursorReader) Read(p []byte) (n int, err error) {    dataAvail := cr.Csr.Next(context.TODO())    if !dataAvail {        n = 0        err = io.EOF        if cr.Csr.Close(context.TODO()) != nil {            fmt.Fprintf(os.Stderr, "Error: MongoDB: getting logs: close cursor: %s", err)        }        return    }    var b bytes.Buffer    w := gzip.NewWriter(&b)    w.Write([]byte(cr.Csr.Current.String() + "\n"))    w.Close()    n = copy(p, []byte(b.String()))    err = nil    return}cursor, err := coll.Find(ctx, filter) // runs the find query and returns cursorcsrRdr := new(CursorReader) // creates a new cursorreader instancecsrRdr.Csr = cursor // assigning the find cursor to cursorreader instance_, err = s3Uploader.Upload(&s3manager.UploadInput{  // Uploading the data to s3 in parts    Bucket: aws.String("bucket"),    Key:    aws.String("key")),    Body:   csrRdr, })如果数据低,那么我没有得到问题。但如果数据很大,那么我就会出错。到目前为止我调试的东西,试图压缩 1500 个文档,每个大小为 15MB,得到错误。即使我尝试将压缩后的字节直接写入本地文件,但我得到了同样的错误。
查看完整描述

1 回答

?
Helenr

TA贡献1780条经验 获得超4个赞

问题似乎是反复调用gzip.NewWriter()infunc(*CursorReader) Read([]byte) (int, error)


您正在gzip.Writer为每个调用分配一个新的Read. gzip压缩是有状态的,因此您只能Writer对所有操作使用单个实例。


解决方案#1

解决您的问题的一个相当简单的方法是读取游标中的所有行并将其传递gzip.Writer并将 gzip 压缩的内容存储到内存缓冲区中。


var cursor, _ = collection.Find(context.TODO(), filter)

defer cursor.Close(context.TODO())


// prepare a buffer to hold gzipped data

var buffer bytes.Buffer

var gz = gzip.NewWriter(&buffer)

defer gz.Close()


for cursor.Next(context.TODO()) {

    if _, err = io.WriteString(gz, cursor.Current.String()); err != nil {

        // handle error somehow  ¯\_(ツ)_/¯

    }

}


// you can now use buffer as io.Reader

// and it'll contain gzipped data for your serialized rows

_, err = s3.Upload(&s3.UploadInput{

    Bucket: aws.String("..."),

    Key:    aws.String("...")),

    Body:   &buffer, 

})

解决方案#2

另一种解决方案是使用goroutines创建一个流,按需读取和压缩数据,而不是在内存缓冲区中io.Pipe()。如果您正在读取的数据非常大并且您无法将所有数据都保存在内存中,这将非常有用。


var cursor, _ = collection.Find(context.TODO(), filter)

defer cursor.Close(context.TODO())


// create pipe endpoints

reader, writer := io.Pipe()


// note: io.Pipe() returns a synchronous in-memory pipe

// reads and writes block on one another

// make sure to go through docs once.


// now, since reads and writes on a pipe blocks

// we must move to a background goroutine else

// all our writes would block forever

go func() {

    // order of defer here is important

    // see: https://stackoverflow.com/a/24720120/6611700

    // make sure gzip stream is closed before the pipe

    // to ensure data is flushed properly

    defer writer.Close()

    var gz = gzip.NewWriter(writer)

    defer gz.Close()


    for cursor.Next(context.Background()) {

        if _, err = io.WriteString(gz, cursor.Current.String()); err != nil {

            // handle error somehow  ¯\_(ツ)_/¯

        }

    }

}()


// you can now use reader as io.Reader

// and it'll contain gzipped data for your serialized rows

_, err = s3.Upload(&s3.UploadInput{

    Bucket: aws.String("..."),

    Key:    aws.String("...")),

    Body:   reader, 

})


查看完整回答
反对 回复 2022-06-01
  • 1 回答
  • 0 关注
  • 202 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号