我正在使用 Apache Beam Go SDK 并且很難以正確的格式獲取PCollection以按鍵進行分組/組合。我在 PCollection 的字符串中每個鍵有多個記錄,如下所示:Bob, catBob, dogCarla, catCarla, bunnyDoug, horse我想使用GroupByKey和CombinePerKey,這樣我就可以像這樣匯總每個人的寵物:Bob, [cat, dog]Carla, [cat, bunny]Doug, [horse]如何將 PCollection<string> 轉換為 PCollection<KV<string, string>>?他們在這里提到了類似的東西,但不包括聚合字符串值的代碼。我可以使用 ParDo 獲取字符串鍵和字符串值,如下所示,但我不知道如何轉換為 GroupPerKey 輸入所需的 KV<string, string> 或 CoGBK<string, string> 格式。pcolOut := beam.ParDo(s, func(line string) (string, string) { cleanString := strings.TrimSpace(line) openingChar := "," iStart := strings.Index(cleanString, openingChar) key := cleanString[0:iStart] value := cleanString[iStart+1:] // How to convert to PCollection<KV<string, string>> before returning? return key, value}, pcolIn)groupedKV := beam.GroupByKey(s, pcolOut) 它失敗并出現以下錯誤。有什么建議么?panic: inserting ParDo in scope root creating new DoFn in scope root binding fn main.main.func2 binding params [{Value string} {Value string}] to input CoGBK<string,string>values of CoGBK<string,string> cannot bind to {Value string}
1 回答

汪汪一只貓
TA貢獻1898條經驗 獲得超8個贊
要映射到 KV,您可以應用 MapElements 并使用 into() 來設置 KV 類型,并在 via() 邏輯中創建一個新KV.of(myKey, myValue)的 ,例如,要獲取一個KV<String,String>,請使用以下內容:
PCollection<KV<String, String>> kvPairs = linkpages.apply(MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(),
TypeDescriptors.strings()))
.via(
linkpage -> KV.of(dataFile, linkpage)));
- 1 回答
- 0 關注
- 125 瀏覽
添加回答
舉報
0/150
提交
取消