2 回答

TA貢獻1824條經驗 獲得超8個贊
在與我的同事考慮后,我們決定實施和使用_ingestAPI,而不是在 ES 中創建一個管道,該管道在每個文檔上插入服務器文檔攝取日期。
腳步:
創建時間戳管道
PUT _ingest/pipeline/timestamp_pipeline
{
? "description" : "Inserts timestamp field for all documents",
? "processors" : [
? ? {
? ? ? "set" : {
? ? ? ? "field": "insert_date",
? ? ? ? "value": "{{_ingest.timestamp}}"
? ? ? }
? ? }
? ]
}
更新索引以添加新的默認字段
PUT /*/_settings
{
? "index" : {
? ? "default_pipeline": "timestamp_pipeline"
? }
}
在 Python 中,我會_scroll像這樣使用 API:
? ? es = Elasticsearch(cfg.esUrl, port = cfg.esPort, timeout = 200)
? ? doc = {
? ? ? "query": {
? ? ? ? "range": {
? ? ? ? ? "insert_date": {
? ? ? ? ? ? "gte": lastRowDateOffset
? ? ? ? ? }
? ? ? ? }
? ? ? }
? ? }
? ? res = es.search(
? ? ? ? index = Index,
? ? ? ? sort = "insert_date:asc",
? ? ? ? scroll = "2m",
? ? ? ? size = NumberOfResultsPerPage,
? ? ? ? body = doc
? ? )
lastRowDateOffset最后一次跑步的日期在哪里

TA貢獻1871條經驗 獲得超8個贊
我想您正在尋找的是一個流式傳輸/更改 API,@Val 在這里對此進行了很好的描述,還有一個開放的功能請求。
與此同時,您不能真正依賴size
和from
參數——您可能會進行冗余查詢并在重復項到達您的數據倉庫之前對其進行處理。
另一種選擇是在這方面跳過 ES 并直接流式傳輸到倉庫嗎?我的意思是,在給定時間之前拍攝一次 ES 快照(這樣您就可以保留歷史數據),將其提供給倉庫,然后直接從您獲取數據的地方流式傳輸到倉庫。
附錄
AFAIK 默認排序是按插入日期。但是沒有內部_insertTime
或類似的東西。你可以使用游標——它被稱為滾動,這是一個 py實現。但這是從“最新”文檔到“第一個”文檔,反之亦然。所以它會給你所有現有的文檔,但我不太確定你滾動時新添加的文檔。然后你想再次運行滾動,這是次優的。
您還可以預先排序您的索引,當結合滾動時,它應該非常適合您的用例。
添加回答
舉報