亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

并行執行 DynamoDB 查詢(全局二級索引的 BatchGetItems)

并行執行 DynamoDB 查詢(全局二級索引的 BatchGetItems)

Go
當年話下 2023-06-12 16:11:38
當查詢在 GSI 上運行時,這里的想法是并行運行多個 DynamoDB 查詢。截至目前,BatchGetItems不支持查詢索引,推薦的方法是并行查詢數據。我正在使用帶有 wg 的 go routines 來并行處理例程的執行。該函數的輸入是一個帶有 ID 的字符串數組,輸出是 ID 的屬性。在本地運行該函數時沒有問題,但是在AWS-Lambda上運行該函數時,返回的數據不斷增長;IE;?輸入 2 項應輸出 2 項。如果函數在 AWS-Lambda 上測試,函數第一次返回 2 個項目第二次返回 4 個項目(相同的項目重復 2 次)第三次它返回 6 個項目(相同的項目重復 4 次)等等。這是代碼片段。每次運行 lambda 時,是否有什么沒有正確處理讓 lambda 輸出額外的數據集?package mainimport (? ? "context"? ? "fmt"? ? "os"? ? "sync"? ? "github.com/aws/aws-lambda-go/lambda"? ? "github.com/aws/aws-sdk-go/aws"? ? "github.com/aws/aws-sdk-go/aws/session"? ? "github.com/aws/aws-sdk-go/service/dynamodb"? ? "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute")//Final Output Interfacevar bulkOutput []interface{}func exitWithError(err error) {? ? fmt.Fprintln(os.Stderr, err)? ? os.Exit(1)}//LambdaInputJSON input for the lambda handlertype LambdaInputJSON struct {? ? Ids? ? ? []string `json:"ids,omitempty"`}//HandleRequest : Lambda entry pointfunc HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {? ? return DynamoDBBatchGetRecords(data), nil}func main() {? ? lambda.Start(HandleRequest)}func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {? ? var wg sync.WaitGroup? ? var mutex = &sync.Mutex{}? ? iterations := len(a.Ids)? ? wg.Add(iterations)? ? for i := 0; i < iterations; i++ {? ? ? ? go QueryOutput(a.Ids[i], &wg, mutex)? ? }? ? wg.Wait()? ? return bulkOutput}//QueryOutput GoRoutinefunc QueryOutput(data string, wg *sync.WaitGroup, mtx *sync.Mutex) {? ? var outputData []interface{}? ? defer wg.Done()? ? sess, err := session.NewSession(&aws.Config{? ? ? ? Region: aws.String("aws-region"),? ? })? ? if err != nil {? ? ? ? exitWithError(fmt.Errorf("failed to make Query API call, %v", err))? ? }
查看完整描述

1 回答

?
繁花不似錦

TA貢獻1851條經驗 獲得超4個贊

根據文檔,全局變量獨立于您的 Lambda 函數的處理程序代碼。這導致緩沖區隨著時間的推移而增加。

糾正后的參考粘貼在下面。

package main


import (

? ? "context"

? ? "fmt"

? ? "os"

? ? "sync"


? ? "github.com/aws/aws-lambda-go/lambda"

? ? "github.com/aws/aws-sdk-go/aws"

? ? "github.com/aws/aws-sdk-go/aws/session"

? ? "github.com/aws/aws-sdk-go/service/dynamodb"

? ? "github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"

)


func exitWithError(err error) {

? ? fmt.Fprintln(os.Stderr, err)

? ? os.Exit(1)

}


//HandleRequest : Lambda entry point

func HandleRequest(ctx context.Context, data LambdaInputJSON) ([]interface{}, error) {

? ? output := DynamoDBBatchGetRecords(data)

? ? return output, nil

}


func main() {

? ? lambda.Start(HandleRequest)

}


func DynamoDBBatchGetRecords(a LambdaInputJSON) []interface{} {

? ? var dataOut []interface{}

? ? var wg = &sync.WaitGroup{}

? ? var mtx = &sync.Mutex{}


? ? iterations := len(a.Ids)

? ? wg.Add(iterations)

? ? for i := 0; i < i; i++ {

? ? ? ? go func(i int) {

? ? ? ? ? ? defer wg.Done()

? ? ? ? ? ? var outputData []interface{}

? ? ? ? ? ? sess, err := session.NewSession(&aws.Config{

? ? ? ? ? ? ? ? Region: aws.String("aws-region"),

? ? ? ? ? ? })

? ? ? ? ? ? if err != nil {

? ? ? ? ? ? ? ? exitWithError(fmt.Errorf("failed to make Query API call, %v", err))

? ? ? ? ? ? }

? ? ? ? ? ? ddb := dynamodb.New(sess)

? ? ? ? ? ? queryInput := &dynamodb.QueryInput{

? ? ? ? ? ? ? ? Limit:? ? ? ? ? ? aws.Int64(1),

? ? ? ? ? ? ? ? TableName:? ? ? ? aws.String("table"),

? ? ? ? ? ? ? ? IndexName:? ? ? ? aws.String("index"),

? ? ? ? ? ? ? ? ScanIndexForward: aws.Bool(false),

? ? ? ? ? ? ? ? ConsistentRead: aws.Bool(false),

? ? ? ? ? ? ? ? KeyConditions: map[string]*dynamodb.Condition{

? ? ? ? ? ? ? ? ? ? "index-column": {

? ? ? ? ? ? ? ? ? ? ? ? ComparisonOperator: aws.String("EQ"),

? ? ? ? ? ? ? ? ? ? ? ? AttributeValueList: []*dynamodb.AttributeValue{

? ? ? ? ? ? ? ? ? ? ? ? ? ? {

? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? S: aws.String(a.Ids[i]),

? ? ? ? ? ? ? ? ? ? ? ? ? ? },

? ? ? ? ? ? ? ? ? ? ? ? },

? ? ? ? ? ? ? ? ? ? },

? ? ? ? ? ? ? ? },

? ? ? ? ? ? }

? ? ? ? ? ? output, err := ddb.Query(queryInput)


? ? ? ? ? ? if err != nil {

? ? ? ? ? ? ? ? exitWithError(fmt.Errorf("E1 failed to make Query API call, %v", err))

? ? ? ? ? ? }

? ? ? ? ? ? err = dynamodbattribute.UnmarshalListOfMaps(output.Items, &outputData)

? ? ? ? ? ? if err != nil {

? ? ? ? ? ? ? ? exitWithError(fmt.Errorf("E2 failed to unmarshal Query result items, %v", err))

? ? ? ? ? ? }


? ? ? ? ? ? mtx.Lock()

? ? ? ? ? ? dataOut = append(dataOut, outputData[0])

? ? ? ? ? ? mtx.Unlock()


? ? ? ? }(i)

? ? }

? ? wg.Wait()

? ? return dataOut

}


查看完整回答
反對 回復 2023-06-12
  • 1 回答
  • 0 關注
  • 187 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號