2 回答

TA貢獻1830條經驗 獲得超9個贊
使用zipWithIndex
。
pyspark 與 Scala 不同。
其他答案對性能不利 - 使用單個執行器。zipWithIndex
是narrow transformation
這樣,它可以按partition
.
在這里,您可以進行相應的定制:
from pyspark.sql.types import StructField
from pyspark.sql.types import StructType
from pyspark.sql.types import StringType, LongType
import pyspark.sql.functions as F
df1 = spark.createDataFrame([ ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4'), ('abc'),('2'),('3'),('4') ], StringType())
schema = StructType(df1.schema.fields[:] + [StructField("index", LongType(), True)])
rdd = df1.rdd.zipWithIndex()
rdd1 = rdd.map(lambda row: tuple(row[0].asDict()[c] for c in schema.fieldNames()[:-1]) + (row[1],))
df1 = spark.createDataFrame(rdd1, schema)
df1.show()
返回:
+-----+-----+
|value|index|
+-----+-----+
|? abc|? ? 0|
|? ? 2|? ? 1|
|? ? 3|? ? 2|
|? ? 4|? ? 3|
|? abc|? ? 4|
|? ? 2|? ? 5|
|? ? 3|? ? 6|
|? ? 4|? ? 7|
|? abc|? ? 8|
|? ? 2|? ? 9|
|? ? 3|? ?10|
|? ? 4|? ?11|
+-----+-----+

TA貢獻2021條經驗 獲得超8個贊
假設:這個答案基于以下假設: 的順序col_id應取決于age列。如果假設不成立,則其他建議的解決方案是問題評論中提到的zipWithIndex。zipWithIndex可以在此答案中找到 的示例用法。
建議的解決方案:您可以使用window帶有空partitionBy和行號的 a 來獲取預期的數字。
from pyspark.sql.window import Window
from pyspark.sql import functions as F
windowSpec = Window.partitionBy().orderBy(F.col('age').asc())
age = age.withColumn(
'col_id',
F.row_number().over(windowSpec)
)
添加回答
舉報