Pyspark Pypark使用键计算值的发生率

我有一个(键、值)对表单的列表: x=[(('cat','dog),('a','b')),(('cat','dog'),('a','b')),(('mouse','rat'),('e','f'))] 我想计算每个值元组与键元组一起出现的次数 期望输出: [(('cat','dog'),('a','b',2)),(('mouse','rat'),('e','f',1))] 一个有效的解决办法是: xs=sc.parallelize(x) xs=xs.groupByKey() xs=xs.map

Pyspark reduce中的意外错误

在pyspark中查找reduce的最大值时,我得到了以下意外结果 agg.reduce(lambda a,b : a if a > b else b ) 我的样本数据是 (u'2013-10-17', 80325.0) (u'2014-01-01', 68521.0) (u'2013-11-10', 83691.0) (u'2013-11-14', 149289.0) (u'2013-11-18', 94756.0) (u'2014-01-30', 126171.0) 结果是 (

Pyspark:收集给定数据帧列中的所有键

我是个初学者。我试图收集呈现特定列的所有键,不同的行具有不同的键值对 |-- A: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) A ID name: 'Peter', age:'25'. 5 name: 'John', country:'US

如何在PySpark Dataframe show中设置显示精度

调用.show时,如何在PySpark中设置显示精度 考虑以下示例: 从数学导入sqrt 导入pyspark.sql.f函数 data=zip maplambda x:sqrtx,范围100105, maplambda x:sqrtx,范围200205 df=sqlCtx.createDataFramedata[col1,col2] 在df.columns.中为c选择[f.avgc.aliasc] 哪些产出: +---------+---------+ |col1 | col2| +------

Pyspark 了解dstream.saveAsTextFiles()行为

我正在使用spark streaming api来熟悉它。我正在实现一个单词计数问题,在这个问题中,我监听一个流,在x秒后找到一个累积的单词计数,并将其输出到文本文件。因此,在所有转换之后,当我使用DStream的saveAsTextFiles()函数将输出打印到文件时,我会得到奇数输出 我希望它每x秒创建一个文本文件,并将最新结果转储到该文件中。我希望文件名是文档中提到的前缀时间戳后缀。但是,我得到的是一个每隔x秒命名一次的文件夹,它的名称与我期望的文件名称相同,并且在该文件夹中是包含我的结果

将嵌套列表转换为数据帧:Pyspark

我试图通过以下链接中的答案将嵌套列表转换为Dataframe 但我得到了这个错误: --------------------------------------------------------------------------- FileNotFoundError Traceback (most recent call last) <ipython-input-147-780a8d7196df> in <module>

Pyspark Spark:如何将多行转换为具有多列的单行?

注意:这只是一个简单的数据示例。与一支真正的板球队相比是没有意义的 我有一个JSON文件,如下所示: { "someID": "a5cf4922f4e3f45", "payload": { "teamID": "1", "players": [ { "type": "Batsman", "name": "Amar", "address": { "state": "Gujarat"

将df.show()的内容保存为pyspark中的字符串

我想将此输出捕获为变量中的字符串。如何实现它?有一个内部/私有函数返回与相同的字符串。show()打印: from pyspark.sql import SparkSession sc = SparkSession.builder.getOrCreate() prsn = sc.read.format("csv").option("delimiter", ",").option("header", "true").option("inferSchema", "true").load("C:/U

PySpark:合并数据帧,其中一个值(来自第一个数据帧)位于其他两个值(来自第二个数据帧)之间

我需要在一个标识符和条件上合并两个数据帧,其中一个数据帧中的日期介于另一个数据帧中的两个日期和另一列的groupby(计算总和)之间 数据帧A具有日期(“日期”)、编号(“编号”)和ID(“ID”): 数据帧B具有Id(“Id”)、fromdate(“fromdate”)和todate(“todate”): 现在我需要合并id和date上的这两个数据帧,然后将所有数字相加。 例如: 考虑数据文件B中的第四行,对于ID 102,在这些日期之间,我们有两个对应的行(行→3,4)从数据文件AM中通过计

Pyspark 在固定另一列中的值时,查找数据帧中一列的所有可能组合

我需要创建一个由作者和电影组成的图形网络。至少参与过一部电影的作者应该被联系起来。我已经创建了包含作者信息的vertice数据框。我无法创建显示此连接的边缘数据框。我有以下数据帧: author_ID | movie_ID nm0000198 | tt0091954 nm0000198 | tt0468569 nm0000198 | tt4555426 nm0000354 | tt0134119 nm0000354 | tt0091954 nm0000721 | tt0091954 我想以某种

pyspark生成特定列的行哈希,并将其添加为新列

我正在使用spark 2.2.0和pyspark2 我已经创建了一个数据帧df,现在尝试添加一个新列“rowhash”,它是数据帧中特定列的sha2哈希 例如,假设df有以下列:(第1列,第2列,…,第10列) 我需要在一个新的列“rowhash”中使用sha2((第2列| | |第3列| | |第4列| |第8列),256) 目前,我尝试使用以下方法: 1) 使用了hash()函数,但由于它提供整数输出,因此没有多大用处 2) 尝试使用sha2()函数,但失败 假设columnarray有我需

如何使用pyspark在jupyter笔记本中引用deltalake表

我正在尝试开始使用DeltaLakes使用Pyspark 为了能够使用deltalake,我在AnacondaShell提示符上调用pyspark,如下所示- pyspark — packages io.delta:delta-core_2.11:0.3.0 这是来自deltalake的参考资料- 在anacondashell提示符下,delta-lake的所有命令都可以正常工作 在jupyter笔记本上,引用deltalake表会产生错误- df_advisorMetrics.write.m

Pyspark 对列列表应用条件的数据帧筛选

如果列表中的任何字符串列为空,我想筛选pyspark数据帧 df = df.where(all([col(x)!='' for x in col_list])) ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions. 您可以使用functools中的red

Pyspark 如何在EMR笔记本中安装.jar依赖项?

我正在运行一个EMR笔记本(平台:AWS,笔记本:jupyter,内核:PySpark)。 我需要安装一个.jar依赖项(sparkdl)来处理一些映像 使用Spark submit,我可以使用: spark-submit --packages databricks:spark-deep-learning:1.5.0-spark2.4-s_2.11 spark = (SparkSession .config('spark.jars.packages', 'databri

无法在pyspark中导入pyarrow

我正在尝试使用pyarrow和pyspark。然而,当我试图执行 import pyarrow 我收到以下错误 In [1]: import pyarrow --------------------------------------------------------------------------- ImportError Traceback (most recent call last) <ipython-in

Pyspark 将大型表从Oracle导入HDFS时出错:';org.apache.spark.shuffle.FetchFailedException:读取错误或源代码被截断';

我一直在尝试将两个巨大的表从Oracle DB复制到HDFS中,我使用PySpark和JDBC从源代码读取表,并将表保存为配置单元分区表 我已经成功地将这些表复制并保存到HDFS中:直接从JDBC读取到Hive分区表 这种方法的问题在于,它会在HDFS中的每个分区上创建大量的小文件。因此,为了避免这种情况,我尝试在写入HDFS之前对从JDBC读取的数据进行重新分区,如下所示: partition_cols = ["col1", "col2"] df = spark.read \ .for

Pyspark dataframe:从混合类型列中删除decimal

我正在研究一个pyspark数据帧,它有一列具有混合类型的数据。如下- Id 1234.0 A234 F456 345.0 567 45.0 K231 我想把小数部分从任何地方去掉。下面应该是这样的- Id 1234 A234 F456 345 567 45 K231 无法将其转换为int类型,因为它包含混合数据。我们如何实现它使用regexp\u replace删除尾部的.0-我认为您可以使用类似(?

Pyspark与Jupyter的集成

我已经在我的机器上安装了Anaconda(python 2.7版),并用“PYSPARK\u DRIVER\u python=jupyter”和“PYSPARK\u DRIVER\u python\u OPTS=“notebook”启动了jupyter笔记本,我正在连接到jupyter笔记本,但也无法运行“print”命令。当我运行命令时,它将进入下一行,但不显示输出,并且打印不会以彩色突出显示 我已经在我的windows机器(独立模式)中安装了pyspark并在命令提示符下运行,它工作正常……

Pyspark数据帧类型错误:应为字符串或缓冲区

我正在Pyspark中为现有数据帧创建一个新的coulmn,方法是搜索一个已归档的“script”并返回match作为新列的条目 import re as re def sw_fix(data_str): if re.compile(r'gaussian').search(data_str): cleaned_str = 'gaussian' elif re.compile(r'gromacs').search(data_str):

Pyspark-在groupByKey之后,根据键计算不同的值?

我想根据键找到多少不同的值,例如,假设我有 x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("b", 2), ("a", 2)]) 我已经完成了groupByKey的使用 sorted(x.groupByKey().map(lambda x : (x[0], list(x[1]))).collect()) x.groupByKey().mapValues(len).collect() 输出将由like [('a', [1, 1, 2])

Pyspark 过滤掉hivecontext.sql中的空字符串和空字符串

我正在使用pyspark和hivecontext.sql,我想从数据中过滤掉所有空值和空值 因此,我使用简单的sql命令首先过滤掉空值,但它不起作用 我的代码: hiveContext.sql("select column1 from table where column2 is not null") 但是它在没有表达式“where column2不为null”的情况下工作 错误: Py4JavaError: An error occurred while calling o577.showS

Pyspark Pypark连接表

我是Pypark的新手。我有'Table A'和'Table B',我需要将两者连接起来才能得到'Table C'。有人能帮我吗 我正在使用数据帧 我不知道如何以正确的方式将这些表连接在一起 表A: +--+----------+-----+ |id|year_month| qt | +--+----------+-----+ | 1| 2015-05| 190 | | 2| 2015-06| 390 | +--+----------+-----+ +---------+--

Pyspark StructType不能接受对象?

我如何解决这个问题 rdd.collect() //['3e866d48b59e8ac8aece79597df9fb4c'...] rdd.toDF() //Can not infer schema for type: <type 'str'> myschema=StructType([StructField("col1", StringType(),True)]) rdd.toDF(myschema).show() rdd.collect()/['3e866d48b5

Pyspark 文本挖掘:使用Word2Vec对有毒评论的错误预测

我有一个包含句子和布尔列(0或1)的数据集来分类评论的类型(有毒|严重|有毒|淫秽|威胁|侮辱|身份|仇恨)。 您可以在此处下载数据集: 我用空格过滤单词,只保留有用的单词,我保留了:形容词、副词、动词和名词使用此功能: def filter_words(words) : vec = [] conditions = ('ADV','NOUN','ADJ','VERB') for token in nlp(words): if not token.is_stop and toke

聚合pyspark中的一个热编码特征

我对python很有经验,但对pyspark完全陌生。我有一个dataframe,它包含大约50万行,具有几个分类特性。对于每个功能,我都有一个热编码功能。下面是一个简化但具有代表性的代码示例 从pyspark.ml.feature导入StringIndexer,OneHotEncoder 从pyspark.ml导入管道 df = sc.parallelize([ (1, 'grocery'), (1, 'drinks'), (1, 'bakery'),

如何在PySpark中合并多个RDD

我想使用一个键将多个RDD合并成一个RDD。有没有一种有效的方法来代替多次加入 例如: Rdd_1 = [(0, a), (1, b), (2, c), (3, d)] Rdd_2 = [(0, aa), (1, bb), (2, cc), (3, dd)] Rdd_3 = [(0, aaa), (1, bbb), (2, ccc), (3, ddd)] 我认为输出应该是这样的 Rdd = [(0, a, aa, aaa), (1, b, bb, bbb), (2, c, cc, ccc),

如何在不使用StandardScaler的情况下标准化PySpark中的列?

看起来这应该行得通,但我有错误: mu=平均值DF[输入] sigma=stddevdf[输入] dft=df.withColumnoutput,df[输入]-mu/sigma pyspark.sql.utils.AnalysisException:分组表达式序列为 空,并且''user`'不是聚合函数。包 '将'sumresponse'转换为双平均'sumresponse''/ stddev_sampCAST`sumsresponse`与中的`scaled`'相同 窗口函数或在第一个或第一个值

动态传递查询字符串以在PySpark数据帧方法selectExpr()中选择列

我动态生成一个查询字符串,如下所示,并将其传递给selectExpr() 根据文件 选择expr(*expr): 投影一组SQL表达式并返回一个新的数据帧。 这是接受SQL表达式的select()的一个变体 问题在于变量“queryString”被视为单个字符串,而不是三个单独的列(这是正确的)。错误如下: :org.apache.spark.sql.catalyst.parser.ParseException: ==SQL== “类别id为类别id”、“类别部门id为类别dpt\U id”、

orderBy如何影响Pyspark数据框架中的Window.partitionBy?

我通过一个例子来解释我的问题: 假设我们有一个数据帧,如下所示: original_df = sc.createDataFrame([('x', 10,), ('x', 15,), ('x', 10,), ('x', 25,), ('y', 20,), ('y', 10,), ('y', 20,)], ["key", "price"] ) original_df.show() 输出: +---+-----+ |key|price| +---+-----+ | x| 10| | x|

Pyspark java.net.SocketTimeoutException:接受超时

我有以下代码: a) 生成本地Spark实例: # Load data from local machine into dataframe from pyspark.sql import SparkSession spark = SparkSession.builder.appName("Basic").master("local[*]").config("spark.network.timeout","50s").config("spark.executor.heartbeatInterval

PySpark:一段时间后并行度降低了?

我正在AWS胶水上使用PySpark。当我查看我的工作指标时,通常如下所示: 请注意,执行人的数量在一段时间后会下降,这是为什么?而且,洗牌是尖锐的,这是因为数据处理——在获得数据之后,需要很长时间吗?我怀疑我的工作中有些地方不是最优的。它依靠一台机器来完成大部分工作。但它是哪一部分呢?有什么建议吗 我的代码: import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions fro

pyspark数据管道使用中间结果

在pyspark中,我会对数据帧执行连续操作,并希望从中间结果中获得输出。但它总是需要相同的时间,我想知道它是否缓存过任何东西?不同的问题是,使用中介结果的最佳实践是什么?在dd.compute(df.amount.max(),df.amount.min())中,将找出需要缓存和计算的内容。pyspark中是否有类似的版本 在下面的示例中,当它到达print()时,它会执行3x吗 df_purchase = spark.read.parquet("s3a:/example/location")[

字符“^在pyspark 2.0中按箭头键时,按[A”;“;^[B”;“;^[C”;“;^[D”;]

在pyspark 2.0中按箭头键时,我得到字符。 我在Cent OS上的虚拟机上安装了spark 1.6和spark 2.0。spark 1.6没有问题 由于我在pyspark 2.0中使用Python2.7,“pip2.7安装readline”不起作用。 我试过了 但这并不能解决问题。这些是ANSI转义序列。^[代表转义字符(ASCII 27),下面包括序列的类型和参数。请阅读更多有关它们的信息。head-2/path/to/SPARK version-*/bin/pyspark的输出是什么

pyspark中有没有一种方法可以将文件从远程位置快速复制到本地

我正在使用mget参数使用lftp从远程位置复制文件。将50个xml文件从sftp机器复制到本地Unix机器大约需要2分钟。我希望能够复制20k个文件。XML文件约为15kb。dataframe df_文件包含我要复制的所有XML文件的列表 我用20000个文件尝试了下面的代码,似乎需要几个小时才能用这些文件创建一个数据帧 for row in df_files.tolist(): print row cmd_p1 = """lftp sftp://userna

Pyspark 如何将文本文件从Databricks笔记本上传到FTP

我试图找到一个解决办法,但一无所获。我在这方面是新手,所以如果你知道解决方案,请帮助我。 谢谢 在Databricks中,您可以使用下面描述的任何一种方法访问存储在ADL中的文件。 有三种访问Azure Data Lake存储Gen2的方法: 使用服务主体和OAuth 2.0将Azure Data Lake Storage Gen2文件系统装载到DBFS 直接使用服务主体 直接使用Azure Data Lake Storage Gen2存储帐户访问密钥 将文件系统中的文件当作本地文件装载和访问的

计算PySparkSQL联接中每个不同值在列中出现的次数

我使用pysparksql将两个表连接在一起,一个包含带有经度和纬度的犯罪地点数据,另一个包含带有相应经度和纬度的邮政编码 我想弄清楚的是如何统计每个邮政编码内发生的犯罪数量。我是PySpark的新手,我的SQL已经生锈了,所以我不确定自己哪里出了问题 我曾尝试使用COUNTDISTINCT,但这只是给出了不同邮政编码的总数 mySchema = StructType([StructField("Longitude", StringType(),True), StructField("Latit

Pyspark 如何将名称拆分为不同的列

如何在pyspark中将全名拆分为不同的列 输入CSV: Name,Marks Sam Kumar Timberlake,83 Theo Kumar Biber,82 Tom Kumar Perry,86 Xavier Kumar Cruse,87 FirstName,MiddleName,LastName,Marks Sam,Kumar,Timberlake,83 Theo,Kumar,Biber,82 Tom,Kumar,Perry,86 Xavier,Kumar,Cruse,87 输出

Pyspark-是否为每个键添加缺少的值?

我有一个Pyspark数据框,其中包含一些非唯一的键键,以及一些列数字和值 对于大多数键,数字列从1到12,但对于其中一些键,数字(例如,我们有数字[1,2,5,9])。我想添加缺少的行,这样对于每个键,我们就可以用最后看到的值填充1-12范围内的所有数字 所以这张桌子 key number value a 1 6 a 2 10 a 5 20 a 9 25 我想去 key

Pyspark-udf中的更新列表

是否有可能更新udf中的列表/变量 让我们考虑一下这个场景: studentsWithNewId = [] udfChangeStudentId = udf(changeStudentId, IntegerType()) def changeStudentId(studentId): if condition: newStudentId = computeNewStudentId() // this function is based on studentsWith

Pyspark-如何复制/复制行?

我需要从数据帧中“克隆”或“复制”/“复制”每一行 我什么也没发现,我只知道我需要使用explode 例如: ID - Name 1 John 2 Maria 3 Charles 输出: ID - Name 1 John 1 John 2 Maria 2 Maria 3 Charles 3 Charles 谢谢您可以使用数组重复和分解(Spark2.4+) 对于重复的: from pyspark.sql import f

Pyspark 在功能和时间上聚合数据

我有一个10分钟间隔的pyspark数据帧,如何在一个分类特征和2小时的时间内聚合它,然后计算其他两个特征的平均值和第三个特征的第一个值 在pyspark中,我的示例数据如下所示。我想按“ind”和“date”的2小时时间分组,然后计算“sal”的平均值和“imp”的第一个值 from pyspark import SparkContext from pyspark.sql import SQLContext sc = SparkContext.getOrCreate() sqlContext

为什么pyspark只能从根目录查找本地配置单元元存储?

我有一个关于delta lake的hive metastore支持的问题, 我已经使用以下配置在独立spark会话上定义了元存储 pyspark --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"

Pyspark-基于列表或词汇表创建列

我是Pyspark的新手,我有一个包含客户id和文本的数据框,其中包含一个关联的值 +------+-----+------+ |id |text |value | +------+-----+------+ | 1 | Cat| 5| | 1 | Dog| 4| | 2 | Oil| 1| 我想基于关键字列表分析文本列,并创建一列,告诉我关键字是否在文本字段中,并提取相关值,预期结果如下 List_keywords = ["D

计算Pyspark数据帧中的运行总数,并在出现情况时中断循环

我有一个spark数据框,其中我需要根据当前行和前一行基于col_x值的金额总和来计算运行总数。当列中出现负数时,我应该打破以前记录的运行总计,并从当前行开始执行运行总计 样本数据集: 预期输出应如下所示: 如何使用pyspark通过数据帧实现这一点?我希望在实际场景中,您将有一个时间戳列来对数据进行排序,我使用带有zipindex的行号对数据进行排序,作为这里的解释基础 from pyspark.sql.window import Window import pyspark.sql.fun

如何在Pyspark中将Int列转换为字符串?

因为我是Pyspark的初学者,有人能帮我把整型列转换成字符串吗 这是我在Aws Athena中的代码,我需要将其转换为pyspark数据帧 case when A.[HHs Reach] = 0 or A.[HHs Reach] is null then '0' when A.[HHs Reach] = 1000000000 then '*' else cast(A.[HHs Reach] as varchar) end as [HHs Reach

如何计算pyspark数据帧中值的条件概率?

我想通过pyspark中列类型的值计算ratings列中评级('A','B','C')的条件概率,而不进行收集 输入: company model rating type 0 ford mustang A coupe 1 chevy camaro B coupe 2 ford fiesta C sedan 3 ford focus A

  1    2   3   4   5   6  ... 下一页 最后一页 共 34 页