我在下面的beamSql查詢中:PCollectionTuple query0 = PCollectionTuple.of( new TupleTag<BeamRecord>("temp2"), temp2).and(new TupleTag<BeamRecord>("temp3"), temp3)")); PCollection<BeamRecord> rec_3 = query0.apply( BeamSql.queryMulti("SELECT a.*, \r\n" + "(case \r\n" + "when a.grp > 5 then 1 \r\n" + "when b.grp > 5 then 1 \r\n" + "else 0 end) as flag \r\n" + "from temp2 a left join \r\n" + "temp3 b on a.eventid = b.eventid and b.Weekint = c1(a.Weekint)").withUdf("c1", AddS.class));在上面的查詢中,我在表temp2和temp3之間進行左連接,在ON條件下,我使用名稱'AddS'調用UDF。在此UDF AddS中,將Weekint用作BigInt。UDF將Weekint用作輸入,并將其轉換為日期格式,然后向其添加7,然后將值返回為BigInt。以下是UDF:public static class AddS implements BeamSqlUdf { private static final long serialVersionUID = 1L; public static BigInteger eval(BigInteger input) throws ParseException{ SimpleDateFormat dateFormat = new SimpleDateFormat("yyyyMMdd"); String strdate = input.toString(); Date date1 = dateFormat.parse(strdate); Calendar c = Calendar.getInstance(); c.setTime(date1); c.add(Calendar.DATE, 7); String f =c.getTime().toString(); BigInteger x = new BigInteger(f); return (x); } } 我無法弄清楚是什么原因導致此錯誤,可能是UDF創建不正確或未正確調用?或如果有人可以向我解釋此錯誤的原因。
1 回答

青春有我
TA貢獻1784條經驗 獲得超8個贊
您的UDF創建不正確。Beam SQL在內部不支持Java BigInteger類型。如果您的SQL數據類型為BigInt,則應改用Java Long類型。
添加回答
舉報
0/150
提交
取消