我的Spark程序读取一个包含gzip压缩的编码64字符串的文件。我需要对其进行解码和解压缩。 我使用Spark的unbase64函数进行解码,并生成了字节数组。
bytedf=df.withColumn("unbase",unbase64(col("value")) )
在Spark中是否有任何可用于解压字节码的Spark方法?
我的Spark程序读取一个包含gzip压缩的编码64字符串的文件。我需要对其进行解码和解压缩。 我使用Spark的unbase64函数进行解码,并生成了字节数组。
bytedf=df.withColumn("unbase",unbase64(col("value")) )
在Spark中是否有任何可用于解压字节码的Spark方法?
def decompress(ip):
bytecode = base64.b64decode(x)
d = zlib.decompressobj(32 + zlib.MAX_WBITS)
decompressed_data = d.decompress(bytecode )
return(decompressed_data.decode('utf-8'))
decompress = udf(decompress)
decompressedDF = df.withColumn("decompressed_XML",decompress("value"))
我有一个类似的情况,在我的情况下,我这样做:
from pyspark.sql.functions import col,unbase64,udf
from gzip import decompress
bytedf=df1.withColumn("unbase",unbase64(col("payload")))
decompress_func = lambda x: decompress(x).decode('utf-8')
udf_decompress = udf(decompress_func)
df2 = bytedf.withColumn('unbase_decompress', udf_decompress('unbase'))
使用Base64的Spark示例-
import base64
.
.
#decode base 64 string using map operation or you may create udf.
df.map(lambda base64string: base64.b64decode(base64string), <string encoder>)
阅读 这里 以获取详细的 Python 示例。