为了账号安全,请及时绑定邮箱和手机立即绑定

并行执行 DynamoDB 查询(全局二级索引的 BatchGetItems)

并行执行 DynamoDB 查询(全局二级索引的 BatchGetItems)

Go
当年话下 2023-06-12 16:11:38
当查询在 GSI 上运行时,这里的想法是并行运行多个 DynamoDB 查询。截至目前,BatchGetItems不支持查询索引,推荐的方法是并行查询数据。我正在使用带有 wg 的 go routines 来并行处理例程的执行。该函数的输入是一个带有 ID 的字符串数组,输出是 ID 的属性。在本地运行该函数时没有问题,但是在AWS-Lambda上运行该函数时,返回的数据不断增长;IE; 输入 2 项应输出 2 项。如果函数在 AWS-Lambda 上测试,函数第一次返回 2 个项目第二次返回 4 个项目(相同的项目重复 2 次)第三次它返回 6 个项目(相同的项目重复 4 次)等等。这是代码片段。每次运行 lambda 时,是否有什么没有正确处理让 lambda 输出额外的数据集?package mainimport (    "context"    "fmt"    "os"    "sync"    "github.com/aws/aws-lambda-go/lambda"    "github.com/aws/aws-sdk-go/aws"    "github.com/aws/aws-sdk-go/aws/session"    "github.com/aws/aws-sdk-go/service/dynamodb"    "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute")//Final Output Interfacevar bulkOutput []interface{}func exitWithError(err error) {    fmt.Fprintln(os.Stderr, err)    os.Exit(1)}//LambdaInputJSON input for the lambda handlertype LambdaInputJSON struct {    Ids      []string `json:"ids,omitempty"`}//HandleRequest : Lambda entry pointfunc HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {    return DynamoDBBatchGetRecords(data), nil}func main() {    lambda.Start(HandleRequest)}func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {    var wg sync.WaitGroup    var mutex = &sync.Mutex{}    iterations := len(a.Ids)    wg.Add(iterations)    for i := 0; i < iterations; i++ {        go QueryOutput(a.Ids[i], &wg, mutex)    }    wg.Wait()    return bulkOutput}//QueryOutput GoRoutinefunc QueryOutput(data string, wg *sync.WaitGroup, mtx *sync.Mutex) {    var outputData []interface{}    defer wg.Done()    sess, err := session.NewSession(&aws.Config{        Region: aws.String("aws-region"),    })    if err != nil {        exitWithError(fmt.Errorf("failed to make Query API call, %v", err))    }
查看完整描述

1 回答

?
繁花不似锦

TA贡献1851条经验 获得超4个赞

根据文档,全局变量独立于您的 Lambda 函数的处理程序代码。这导致缓冲区随着时间的推移而增加。

纠正后的参考粘贴在下面。

package main


import (

    "context"

    "fmt"

    "os"

    "sync"


    "github.com/aws/aws-lambda-go/lambda"

    "github.com/aws/aws-sdk-go/aws"

    "github.com/aws/aws-sdk-go/aws/session"

    "github.com/aws/aws-sdk-go/service/dynamodb"

    "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"

)


func exitWithError(err error) {

    fmt.Fprintln(os.Stderr, err)

    os.Exit(1)

}


//HandleRequest : Lambda entry point

func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {

    output := DynamoDBBatchGetRecords(data)

    return output, nil

}


func main() {

    lambda.Start(HandleRequest)

}


func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {

    var dataOut []interface{}

    var wg = &sync.WaitGroup{}

    var mtx = &sync.Mutex{}


    iterations := len(a.Ids)

    wg.Add(iterations)

    for i := 0; i < i; i++ {

        go func(i int) {

            defer wg.Done()

            var outputData []interface{}

            sess, err := session.NewSession(&aws.Config{

                Region: aws.String("aws-region"),

            })

            if err != nil {

                exitWithError(fmt.Errorf("failed to make Query API call, %v", err))

            }

            ddb := dynamodb.New(sess)

            queryInput := &dynamodb.QueryInput{

                Limit:            aws.Int64(1),

                TableName:        aws.String("table"),

                IndexName:        aws.String("index"),

                ScanIndexForward: aws.Bool(false),

                ConsistentRead: aws.Bool(false),

                KeyConditions: map[string]*dynamodb.Condition{

                    "index-column": {

                        ComparisonOperator: aws.String("EQ"),

                        AttributeValueList: []*dynamodb.AttributeValue{

                            {

                                S: aws.String(a.Ids[i]),

                            },

                        },

                    },

                },

            }

            output, err := ddb.Query(queryInput)


            if err != nil {

                exitWithError(fmt.Errorf("E1 failed to make Query API call, %v", err))

            }

            err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)

            if err != nil {

                exitWithError(fmt.Errorf("E2 failed to unmarshal Query result items, %v", err))

            }


            mtx.Lock()

            dataOut = append(dataOut, outputData[0])

            mtx.Unlock()


        }(i)

    }

    wg.Wait()

    return dataOut

}


查看完整回答
反对 回复 2023-06-12
  • 1 回答
  • 0 关注
  • 112 浏览
慕课专栏
更多

添加回答

举报

0/150
提交
取消
意见反馈 帮助中心 APP下载
官方微信