2 回答

TA貢獻1824條經驗 獲得超6個贊
public static Integer temp1;
public static Map<Integer,ArrayList<String>> temp2 = new HashMap<>();
final Map< Integer,ArrayList<String>> deger = new HashMap<>();
deger.put(1,new ArrayList<>(Arrays.asList("h:1","g:1","s:0")));
deger.put(2,new ArrayList<>(Arrays.asList("h:1","g:1","g:0")));
deger.put(3,new ArrayList<>(Arrays.asList("h:1","c:0","g:0")));
deger.put(4,new ArrayList<>(Arrays.asList("h:1","s:1","g:0")));
Pattern<String,?> pattern = Pattern.<String>begin("start").where(
new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
flag = false;
for(Map.Entry<Integer, ArrayList<String>> entryStart : deger.entrySet()) {
if(entryStart.getValue().contains(value) && !temp2.containsKey(entryStart.getKey())){
ArrayList<String> newList = new ArrayList<String>();
newList.addAll(entryStart.getValue());
newList.remove(value);
temp2.put(entryStart.getKey(),newList);
flag = true;
}
}
return flag;
}
}
).followedBy("middle").where(
new SimpleCondition<String>() {
@Override
public boolean filter(String middle) throws Exception {
flag = false;
for(Map.Entry<Integer, ArrayList<String>> entryMiddle : temp2.entrySet()) {
if(entryMiddle.getValue().contains(middle) && entryMiddle.getValue().size() == 2){
ArrayList<String> newListMiddle = new ArrayList<String>();
newListMiddle.addAll(entryMiddle.getValue());
newListMiddle.remove(middle);
temp2.put(entryMiddle.getKey(),newListMiddle);
flag = true;
}
}
return flag;
}
}
).followedBy("end").where(
new SimpleCondition<String>() {
@Override
public boolean filter(String end) throws Exception {
flag = false;
for(Map.Entry<Integer, ArrayList<String>> entryEnd : temp2.entrySet()) {
if(entryEnd.getValue().contains(end) && entryEnd.getValue().size() == 1){
flag = true;
temp1 = entryEnd.getKey();
}
}
if (flag)
temp2.remove(temp1);
return flag;
}
}
);
PatternStream<String> patternStream = CEP.pattern(stream_itemset_ham,pattern);
DataStream<String> result = patternStream.select(
new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> map) throws Exception {
ArrayList<String> NewList= new ArrayList<>();
NewList.addAll(deger.get(temp1));
String found = "Found";
for (String list_element : NewList)
found += " " + list_element ;
return found;
}
}
);
result.print();
我從你的問題中了解到,可以提供這種解決方案。

TA貢獻2019條經驗 獲得超9個贊
所以目前AFAIK Flink不支持開箱即用的無序模式,所以基本上我看到了兩種解決這個問題的方法:
1)您可以創建要搜索的所有可能的模式,并簡單地合并所有生成的數據流。
2)正如這篇文章所建議的FlinkCEP:我可以引用一個較早的事件來定義后續匹配嗎?您可以嘗試使用它來允許您訪問已經匹配的先前元素,因此基本上您必須定義與列表中所有可能的元素匹配的模式,然后只需檢查最后一個條件,如果所有三個元素都屬于同一列表。如果是這樣,則找到該模式。IterativeCondition
添加回答
舉報