我正在嘗試使用 Apache Beam 讀取 avro 文件并使用 Beam SQL 來轉換數據。我對 Beam 和 Java 還是新手。這是我的簡單代碼:public class BeamSQLReadAvro { @SuppressWarnings("serial") public static void main(String[] args) throws IOException { PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create(); Pipeline p = Pipeline.create(options); /* Schema definition */ Schema schema = new Schema.Parser().parse(new File("data/RATE_CODE/RATE_CODE.avsc")); /* Create record/row */ PCollection<GenericRecord> records = p.apply(AvroIO.readGenericRecords(schema).from("data/RATE_CODE/*.avro")); /* SQL Transform */ records.apply("SQL Transform 01",SqlTransform.query("SELECT RCODE,RNAME,RDESC FROM PCOLLECTION LIMIT 10")) /* Print output */ .apply("Output", MapElements.via( new SimpleFunction<Row, Row>() { @Override public Row apply(Row input) { System.out.println("PCOLLECTION: " + input.getValues()); return input; } } ) ); p.run().waitUntilFinish(); }}它給了我錯誤Exception in thread "main" java.lang.IllegalStateException: Cannot call getSchema when there is no schema我不明白,我定義了一個名為 schema 的變量。這里有什么指點嗎?
1 回答

慕姐4208626
TA貢獻1852條經驗 獲得超7個贊
實際上,您的管道中有兩種類型的模式 - Avro 和 Beam 模式。Avro 模式用于解析 Avro 輸入記錄,但對于 SQL 轉換,您應該使用具有 Beam 模式的行。為此,AvroIO
提供一個選項withBeamSchemas(boolean)
,應true
根據您的情況設置為,例如:
AvroIO.readGenericRecords(schema).withBeamSchemas(true).from("data/RATE_CODE/*.avro")
添加回答
舉報
0/150
提交
取消