目前,我正在使用來自組織內 RabbitMQ 隊列的消息。每天,我需要將所有收到的消息推送到一個csv中,該csv最終將作為Datawarehouse中的表登陸。代碼總是在監聽隊列,理想情況下,我希望將數據流式傳輸到csv。#callback funtion on receiving messagesdef onMessage(channel, method, properties, body): print(body)while True: try: #connect credentials = pika.PlainCredentials(username, password) connection = pika.BlockingConnection(pika.ConnectionParameters(host = server, port = port, virtual_host = vhost, credentials = credentials))channel = connection.channel()channel.basic_consume(on_message_callback = onMessage, queue = queueName, auto_ack = True) channel.start_consuming()開始使用隊列后收到的輸出如下所示:這是收到的一行數據。它基本上返回一個json對象,但是b'{“metrics”:在使用json對象時需要刪除。b'{“metrics”:[{“ci_id”:“SPN-EQSHATA1”,“client_id”:“39956e6fdb256757567567567433333193a”,“name”:“deviceHealthScore”,“source_id”:“Global”,“source_management_platform”:“XXX”,“timestamp”:1582886099642,“unit”:client_id ci_id“configAssuranceScore”,“source_id”:“Global”,“source_management_platform”:“XXX”,“timestamp”:1582886099325,“unit”:“count”,“value”:“1.0”},{“ci_id”:”SPN-EQSHATA1“,”client_id“:”39956e6fdb25675756756743333193a“,”name“:”imageAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099325,”unit“:”count“,”count“,”value“:”1.0“},{”ci_id“:”SPN-EQSHATA1“,”client_id“:”39956e6fdb256757567567567433333193a“,”name“:”vulnerabilityAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099325,”unit“:”count“,”value“:”10.0“},{”ci_id“:”SPN-EQSHATA1“,”client_id“:”39956e6fdb256757567567433333193a“,”name“:”overallAssuranceScore“,”source_id“:”Global“,”source_management_platform“:”XXX“,”timestamp“:1582886099642,”unit“:”count“,”value“:”5.5“}],”emr_published_on“:1582886099642}'
1 回答

炎炎設計
TA貢獻1808條經驗 獲得超4個贊
b'...'
只是意味著你得到了一個json模塊可以愉快地處理的字節字符串。您將獲得一個字典,對于鍵,它具有字典列表的值。該列表可以直接饋送數據幀。metrics
這意味著您可以像以下方式一樣簡單地處理它:
df = pd.DataFrame(json.loads(body)['metrics'])
添加回答
舉報
0/150
提交
取消