我們正在創建一個數據流管道,我們將從 postgres 讀取數據并將其寫入鑲木地板文件。ParquetIO.Sink 允許您將 GenericRecord 的 PCollection 寫入 Parquet 文件(來自此處https://beam.apache.org/releases/javadoc/2.5.0/org/apache/beam/sdk/io/parquet/ParquetIO。網頁)。但是鑲木地板文件架構并不像我預期的那樣這是我的模式:schema = new org.apache.avro.Schema.Parser().parse("{\n" + " \"type\": \"record\",\n" + " \"namespace\": \"com.example\",\n" + " \"name\": \"Patterns\",\n" + " \"fields\": [\n" + " { \"name\": \"id\", \"type\": \"string\" },\n" + " { \"name\": \"name\", \"type\": \"string\" },\n" + " { \"name\": \"createdAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" + " { \"name\": \"updatedAt\", \"type\": {\"type\":\"string\",\"logicalType\":\"timestamps-millis\"} },\n" + " { \"name\": \"steps\", \"type\": [\"null\",{\"type\":\"array\",\"items\":{\"type\":\"string\",\"name\":\"json\"}}] },\n" + " ]\n" + "}");到目前為止,這是我的代碼:Pipeline p = Pipeline.create( PipelineOptionsFactory.fromArgs(args).withValidation().create());p.apply(JdbcIO.<GenericRecord> read() .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration.create( "org.postgresql.Driver", "jdbc:postgresql://localhost:port/database") .withUsername("username") .withPassword("password")) .withQuery("select * from table limit(10)") .withCoder(AvroCoder.of(schema)) .withRowMapper((JdbcIO.RowMapper<GenericRecord>) resultSet -> { GenericRecord record = new GenericData.Record(schema); ResultSetMetaData metadata = resultSet.getMetaData(); int columnsNumber = metadata.getColumnCount();
2 回答

皈依舞
TA貢獻1851條經驗 獲得超3個贊
我沒有找到從 Avro 創建不在 GroupType 中的重復元素的方法。
Beam 中的 ParquetIO 使用項目中定義的“標準”avro 轉換,在這里parquet-mr
實現。
似乎有兩種方法可以將 Avro ARRAY 字段轉換為 Parquet 消息——但它們都沒有創建您正在尋找的內容。
目前,avro 轉換是目前與 ParquetIO 交互的唯一方式。我在 ParquetIO 中看到了這個 JIRA Use Beam 模式,將其擴展到 Beam Rows,這可能允許不同的 parquet 消息策略。
或者,您可以為 ParquetIO 創建 JIRA 功能請求以支持 thrift 結構,這應該允許更好地控制 parquet 結構。

FFIVE
TA貢獻1797條經驗 獲得超6個贊
它是您用來描述預期模式的 protobuf 消息嗎?我認為您得到的是從指定的 JSON 模式正確生成的。optional repeated
在 protobuf 語言規范中沒有意義:https://developers.google.com/protocol-buffers/docs/reference/proto2-spec
您可以刪除null
方括號以生成簡單的repeated
字段,它在語義上等同于optional repeated
(因為repeated
意味著零次或多次)。
添加回答
舉報
0/150
提交
取消