亚洲在线久爱草,狠狠天天香蕉网,天天搞日日干久草,伊人亚洲日本欧美

為了賬號安全,請及時綁定郵箱和手機立即綁定
已解決430363個問題,去搜搜看,總會有你想問的

解析 Pyspark 數據框中的 XML 列

解析 Pyspark 數據框中的 XML 列

qq_笑_17 2023-05-23 19:12:10
我對 PySpark 比較陌生,正在嘗試解決數據問題。我有一個 pyspark DF,它是用從 MS SQL Server 中提取的數據創建的,有 2 列:ID(整數)和 XMLMsg(字符串)。第 2 列 XMLMsg 包含 XML 格式的數據。目標是解析 XMLMsg 列并在同一個 DF 中使用從 XML 中提取的列創建其他列。以下是 pyspark DF 的示例結構:ID  XMLMsg101 ...<a><b>name1</b><c>loc1</c></a>...<d>dept1</d>...102 ...<a><b>name2</b><c>loc2</c></a>...<d>dept2</d>...103 ...<a><b>name3</b><c>loc3</c></a>...<d>dept3</d>...預期輸出是:ID  XMLMsg                                              b       c       d101 ...<a><b>name1</b><c>loc1</c></a>...<d>dept1</d>... name1   loc1    dept1102 ...<a><b>name2</b><c>loc2</c></a>...<d>dept2</d>... name2   loc2    dept2103 ...<a><b>name3</b><c>loc3</c></a>...<d>dept3</d>... name3   loc3    dept3根據我在 SO 中的搜索,我嘗試了一些建議;然而,未能達到預期的效果。因此,尋求一些幫助和指導。謝謝你的時間。
查看完整描述

1 回答

?
largeQ

TA貢獻2039條經驗 獲得超8個贊

考慮到我必須從一個巨大的 XML 文件中獲取來自 4 個節點的文本,我最終使用 Lambda 和 UDF 解決了這個問題。由于 XML 文件已經在列中并且是 pyspark Dataframe 的一部分,我不想寫成文件并再次解析整個 XML。我還想避免使用 XSD 架構。實際的 xml 有多個命名空間和一些具有特定條件的節點。例子:


<ap:applicationproduct xmlns:xsi="http://www.example.com/2005/XMLSchema-instance" xmlns:ap="http://example.com/productinfo/1_6" xmlns:ct="http://example.com/commontypes/1_0" xmlns:dc="http://example.com/datacontent/1_0" xmlns:tp="http://aexample.com/prmvalue/1_0" ....." schemaVersion="..">

  <ap:ParameterInfo>

    <ap:Header>

      <ct:Version>1.0</ct:Version>

      <ct:Sender>ABC</ct:Sender>

      <ct:SenderVersion />

      <ct:SendTime>...</ct:SendTime>

    </ap:Header>

    <ap:ProductID>

      <ct:Model>

        <ct:Series>34AP</ct:Series>

        <ct:ModelNo>013780</ct:ModelNo>

     ..............

      ..............

   <ap:Object>

        <ap:Parameter schemaVersion="2.5" Code="DDA">

          <dc:Value>

            <tp:Blob>mbQAEAgBTgKQEBAX4KJJYABAIASL0AA==</tp:Blob>

          </dc:Value>

        </ap:Parameter>

.........

........

在這里我需要從 ct:ModelNo 和 tp:Blob 中提取值


from pyspark.sql.types import *

from pyspark.sql.functions import udf

import xml.etree.ElementTree as ET

# List of namespaces to be used:

ns = {'ap' : 'http://example.com/productinfo/1_6', 

'ct':'http://example.com/commontypes/1_0', 

'dc':'http://example.com/datacontent/1_0', 

'tp':'http://aexample.com/prmvalue/1_0' 

    }


parsed_model = (lambda x:    ET.fromstring(x).find('ap:ParameterInfo/ap:ProductID/ct:Model/ct:ModelNo', ns).text)

udf_model = udf(parsed_model)

parsed_model_df = df.withColumn('ModelNo', udf_Model('XMLMsg'))

同樣對于具有 blob 值的節點,可以編寫類似的函數,但節點的路徑將是:'ap:ParameterInfo/ap:Object/ap:Parameter[@Code="DDA"]/dc:Value/tp:Blob'


這對我有用,我能夠在 pyspark DF 中添加所需的值。歡迎提出任何建議以使其變得更好。謝謝你!


查看完整回答
反對 回復 2023-05-23
  • 1 回答
  • 0 關注
  • 168 瀏覽
慕課專欄
更多

添加回答

舉報

0/150
提交
取消
微信客服

購課補貼
聯系客服咨詢優惠詳情

幫助反饋 APP下載

慕課網APP
您的移動學習伙伴

公眾號

掃描二維碼
關注慕課網微信公眾號