Create A New Column In Pyspark Dataframe By Applying A Udf On Another Column From This Dataframe
My data is dataset diamond: +-----+-------+-----+-------+-----+-----+-----+----+----+----+ |carat| cut|color|clarity|depth|table|price| x| y| z| +-----+-------+-----+-----
Solution 1:
Modifying your solution
Your problem is that your udf is explicitly looking for a the globally defined df
and is not using it's size
parameter in any way.
Try this:
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
@F.udf(StringType())defbin_carat(s):
if0 < s <= 1:
return'[0,1)'if1 < s <= 2:
return'[1,2)'if2 < s <= 3:
return'[2,3)'if3 < s <= 4:
return'[3,4)'if4 < s <= 5:
return'[4,5)'elif s:
return'[5, 6)'
diamonds.withColumn("carat_bin", bin_carat(diamonds['carat'])).show()
This results in (I modified your inputs slightly so that one can see the different cases):
+-----+-------+-----+-------+-----+-----+-----+----+----+----+---------+
|carat| cut|color|clarity|depth|table|price| x| y| z|carat_bin|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+---------+
| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43| [0,1)|
| 1.34|Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31| [1,2)|
| 2.45| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31| [2,3)|
| 3.12|Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63| [3,4)|
| 5.6| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75| [5, 6)|
+-----+-------+-----+-------+-----+-----+-----+----+----+----+---------+
For your dataframe, just as expected.
There seems to be a fundamental difference when using spark.udf.register('carat_bin', carat_bin)
which always led to an error.
Using pandas udfs
If you use pyspark 2.3 and above, there is an even simpler way to achieve this using pandas udfs. Just have a look at the following:
from pyspark.sql.functions import PandasUDFType
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf(StringType(), PandasUDFType.SCALAR)defcut_to_str(s):
return pd.cut(s, bins=[0,1,2,3,4,5], labels=['[0,1)', '[1,2)', '[2,3)', '[3,4)', '[4,5)']).astype(str)
Use this in the same fashion as the previously defined udf:
diamonds.withColumn("carat_bin", cut_to_str(diamonds['carat'])).show()
And it will result in the exact same dataframe as shown above.
Post a Comment for "Create A New Column In Pyspark Dataframe By Applying A Udf On Another Column From This Dataframe"