2 回答

TA貢獻1995條經驗 獲得超2個贊
將 csv 文件讀取為文本并將值拆分,并計算元素。
df = spark.read.text('test.csv')
df.show(10, False)
+-------------------------------+
|value |
+-------------------------------+
|Col1,Col2,Col3,Col4 |
|Value11,Value12,Value13,Value14|
|Value21,Value22,Value23,Value24|
+-------------------------------+
import pyspark.sql.functions as F
df2 = df.withColumn('count', F.size(F.split('value', ',')))
df2.show(10, False)
+-------------------------------+-----+
|value |count|
+-------------------------------+-----+
|Col1,Col2,Col3,Col4 |4 |
|Value11,Value12,Value13,Value14|4 |
|Value21,Value22,Value23,Value24|4 |
+-------------------------------+-----+
df2.groupBy().agg(F.min('count'), F.max('count')).show(10, False)
+----------+----------+
|min(count)|max(count)|
+----------+----------+
|4 |4 |
+----------+----------+

TA貢獻1808條經驗 獲得超4個贊
由于您想知道錯誤的行,因此唯一的方法就是循環:
In [18]: erroneous_lines = []
In [19]: with open(r'C:\Users\abaskaran\Desktop\mycsv.txt') as fd:
...: for line_num, line in enumerate(fd,1):
...: if len(line.split(',')) != 4:
...: erroneous_lines.append((line_num, line))
In [20]: erroneous_lines
Out[20]:
[(5, 'Value21,Value22,Value23,Value24Value11,Value12,Value13,Value14\n'),
(6, 'Value21,Value22,Value23\n')]
該erroneous_lines列表將包含一個元組列表,包含行號和行的實際內容,但不包含所有值。
我將 CSV 內容修改為 belowj 只是為了測試:
Col1,Col2,Col3,Col4
Value11,Value12,Value13,Value14
Value21,Value22,Value23,Value24
Value11,Value12,Value13,Value14
Value21,Value22,Value23,Value24Value11,Value12,Value13,Value14
Value21,Value22,Value23
Value11,Value12,Value13,Value14
Value21,Value22,Value23,Value24
Value11,Value12,Value13,Value14
Value21,Value22,Value23,Value24
添加回答
舉報