亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

如何使用 confluent 的 REST 代理將 JSON 數據發送到 kafka?

如何使用 confluent 的 REST 代理將 JSON 數據發送到 kafka?

慕絲7291255 2021-11-02 16:58:39
對于我的學士論文,我嘗試使用 http 連接向 kafka 發送機器數據(在這種情況下,使用 python 腳本發送的歷史數據)。我正在使用在 Windows 系統上的 docker 中運行的融合平臺。使用 python 腳本,我嘗試將數據發送到 REST 代理。起初,我收到了關于我能夠解決的數據類型的錯誤響應。import pandas as pdimport csv, os, json, requests, time, datetime, copy, sysif len(sys.argv) > 1:    bgrfc_value = str(sys.argv[1])else:    print("No arguments for bgrfc given, defaulting to 'false'")    bgrfc_value = 'false'if len(sys.argv) > 2:    filePath = str(sys.argv[2])else:    filePath = "path"if len(sys.argv) > 3:    batchSize = int(float(str(sys.argv[3])))else:    batchSize = 10# Build skeleton JSONbasejson = {"message": {"meta" : "", "data": ""}}#metajson = [{'meta_key' : 'sender', 'meta_value': 'OPCR'},#           {'meta_key' : 'receiver', 'meta_value': 'CAT'},#            {'meta_key' : 'message_type', 'meta_value': 'MA1SEK'},#            {'meta_key' : 'bgrfc', 'meta_value': bgrfc_value}]#basejson['message']['meta'] = metajsonurl = "http://127.0.0.1:8082/"headers = {'Content-Type':'application/json','Accept':'application/json'}def assign_timestamps(batch):    newtimestamps = []    oldtimestamps = []    # Batch timestamps to list, add 10 newly generated timestamps to a list    for item in batch['tag_tsp'].values.tolist():        newtimestamps.append(datetime.datetime.now())        oldtimestamps.append(datetime.datetime.strptime(str(item), "%Y%m%d%H%M%S.%f"))    # Sort old timestamps without sorting the original array to preserve variance    temp = copy.deepcopy(oldtimestamps)    temp.sort()    mrtimestamp = temp[0]    # Replicate variance of old timestamps into the new timestamps    for x in range(batchSize):        diff = mrtimestamp - oldtimestamps[x]        newtimestamps[x] = newtimestamps[x] - diff        newtimestamps[x] = newtimestamps[x].strftime("%Y%m%d%H%M%S.%f")[:-3]    # Switch old timestamps with new timestamps    batch['tag_tsp'] = newtimestamps    return batch該腳本發送數據,但作為響應,我得到狀態代碼 500。
查看完整描述

2 回答

?
茅侃侃

TA貢獻1842條經驗 獲得超22個贊

您的標頭值不正確。你需要設置Accept和Content-type兩個頭下面給出:


 Accept: application/vnd.kafka.v2+json

 Content-Type : application/vnd.kafka.json.v2+json

此外,數據應按以下方式結構化:


{"records":[{"value":{<Put your json record here>}}]}

例如 :


{"records":[{"value":{"foo":"bar"}}]}


查看完整回答
反對 回復 2021-11-02
?
烙印99

TA貢獻1829條經驗 獲得超13個贊

我相信您放入“value”的數據必須是字符串。像這樣的事情會起作用:

{"records":[{"value":"{'foo':'bar'}"}]}

如果您在閱讀主題時收到一條有趣的消息,請嘗試使用 base64 編碼對消息進行編碼。編碼后的原始 json 字符串應如下所示:

{"records":[{"value":"eyJmb28iOiJiYXIifQ=="}]}


查看完整回答
反對 回復 2021-11-02
  • 2 回答
  • 0 關注
  • 261 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號