我有一个(键、值)对表单的列表:
x=[(('cat','dog),('a','b')),(('cat','dog'),('a','b')),(('mouse','rat'),('e','f'))]
我想计算每个值元组与键元组一起出现的次数
期望输出:
[(('cat','dog'),('a','b',2)),(('mouse','rat'),('e','f',1))]
一个有效的解决办法是:
xs=sc.parallelize(x)
xs=xs.groupByKey()
xs=xs.map
在pyspark中查找reduce的最大值时,我得到了以下意外结果
agg.reduce(lambda a,b : a if a > b else b )
我的样本数据是
(u'2013-10-17', 80325.0)
(u'2014-01-01', 68521.0)
(u'2013-11-10', 83691.0)
(u'2013-11-14', 149289.0)
(u'2013-11-18', 94756.0)
(u'2014-01-30', 126171.0)
结果是
(
我是个初学者。我试图收集呈现特定列的所有键,不同的行具有不同的键值对
|-- A: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
A ID
name: 'Peter', age:'25'. 5
name: 'John', country:'US
调用.show时,如何在PySpark中设置显示精度
考虑以下示例:
从数学导入sqrt
导入pyspark.sql.f函数
data=zip
maplambda x:sqrtx,范围100105,
maplambda x:sqrtx,范围200205
df=sqlCtx.createDataFramedata[col1,col2]
在df.columns.中为c选择[f.avgc.aliasc]
哪些产出:
+---------+---------+
|col1 | col2|
+------
我正在使用spark streaming api来熟悉它。我正在实现一个单词计数问题,在这个问题中,我监听一个流,在x秒后找到一个累积的单词计数,并将其输出到文本文件。因此,在所有转换之后,当我使用DStream的saveAsTextFiles()函数将输出打印到文件时,我会得到奇数输出
我希望它每x秒创建一个文本文件,并将最新结果转储到该文件中。我希望文件名是文档中提到的前缀时间戳后缀。但是,我得到的是一个每隔x秒命名一次的文件夹,它的名称与我期望的文件名称相同,并且在该文件夹中是包含我的结果
我试图通过以下链接中的答案将嵌套列表转换为Dataframe
但我得到了这个错误:
---------------------------------------------------------------------------
FileNotFoundError Traceback (most recent call last)
<ipython-input-147-780a8d7196df> in <module>
标签: Pyspark
apache-spark-sqlpyspark-sql
注意:这只是一个简单的数据示例。与一支真正的板球队相比是没有意义的
我有一个JSON文件,如下所示:
{
"someID": "a5cf4922f4e3f45",
"payload": {
"teamID": "1",
"players": [
{
"type": "Batsman",
"name": "Amar",
"address": {
"state": "Gujarat"
我想将此输出捕获为变量中的字符串。如何实现它?有一个内部/私有函数返回与相同的字符串。show()打印:
from pyspark.sql import SparkSession
sc = SparkSession.builder.getOrCreate()
prsn = sc.read.format("csv").option("delimiter", ",").option("header", "true").option("inferSchema", "true").load("C:/U
我需要在一个标识符和条件上合并两个数据帧,其中一个数据帧中的日期介于另一个数据帧中的两个日期和另一列的groupby(计算总和)之间
数据帧A具有日期(“日期”)、编号(“编号”)和ID(“ID”):
数据帧B具有Id(“Id”)、fromdate(“fromdate”)和todate(“todate”):
现在我需要合并id和date上的这两个数据帧,然后将所有数字相加。
例如:
考虑数据文件B中的第四行,对于ID 102,在这些日期之间,我们有两个对应的行(行→3,4)从数据文件AM中通过计
我需要创建一个由作者和电影组成的图形网络。至少参与过一部电影的作者应该被联系起来。我已经创建了包含作者信息的vertice数据框。我无法创建显示此连接的边缘数据框。我有以下数据帧:
author_ID | movie_ID
nm0000198 | tt0091954
nm0000198 | tt0468569
nm0000198 | tt4555426
nm0000354 | tt0134119
nm0000354 | tt0091954
nm0000721 | tt0091954
我想以某种
标签: Pyspark
string-concatenationsha256
我正在使用spark 2.2.0和pyspark2
我已经创建了一个数据帧df,现在尝试添加一个新列“rowhash”,它是数据帧中特定列的sha2哈希
例如,假设df有以下列:(第1列,第2列,…,第10列)
我需要在一个新的列“rowhash”中使用sha2((第2列| | |第3列| | |第4列| |第8列),256)
目前,我尝试使用以下方法:
1) 使用了hash()函数,但由于它提供整数输出,因此没有多大用处
2) 尝试使用sha2()函数,但失败
假设columnarray有我需
我正在尝试开始使用DeltaLakes使用Pyspark
为了能够使用deltalake,我在AnacondaShell提示符上调用pyspark,如下所示-
pyspark — packages io.delta:delta-core_2.11:0.3.0
这是来自deltalake的参考资料-
在anacondashell提示符下,delta-lake的所有命令都可以正常工作
在jupyter笔记本上,引用deltalake表会产生错误-
df_advisorMetrics.write.m
如果列表中的任何字符串列为空,我想筛选pyspark数据帧
df = df.where(all([col(x)!='' for x in col_list]))
ValueError: Cannot convert column into bool: please use '&' for 'and', '|' for 'or', '~' for 'not' when building DataFrame boolean expressions.
您可以使用functools中的red
我正在运行一个EMR笔记本(平台:AWS,笔记本:jupyter,内核:PySpark)。
我需要安装一个.jar依赖项(sparkdl)来处理一些映像
使用Spark submit,我可以使用:
spark-submit --packages databricks:spark-deep-learning:1.5.0-spark2.4-s_2.11
spark = (SparkSession
.config('spark.jars.packages', 'databri
我正在尝试使用pyarrow和pyspark。然而,当我试图执行
import pyarrow
我收到以下错误
In [1]: import pyarrow
---------------------------------------------------------------------------
ImportError Traceback (most recent call last)
<ipython-in
标签: Pyspark
Hive
apache-spark-sqlparquethive-partitions
我一直在尝试将两个巨大的表从Oracle DB复制到HDFS中,我使用PySpark和JDBC从源代码读取表,并将表保存为配置单元分区表
我已经成功地将这些表复制并保存到HDFS中:直接从JDBC读取到Hive分区表
这种方法的问题在于,它会在HDFS中的每个分区上创建大量的小文件。因此,为了避免这种情况,我尝试在写入HDFS之前对从JDBC读取的数据进行重新分区,如下所示:
partition_cols = ["col1", "col2"]
df = spark.read \
.for
我正在研究一个pyspark数据帧,它有一列具有混合类型的数据。如下-
Id
1234.0
A234
F456
345.0
567
45.0
K231
我想把小数部分从任何地方去掉。下面应该是这样的-
Id
1234
A234
F456
345
567
45
K231
无法将其转换为int类型,因为它包含混合数据。我们如何实现它使用regexp\u replace删除尾部的.0-我认为您可以使用类似(?
我已经在我的机器上安装了Anaconda(python 2.7版),并用“PYSPARK\u DRIVER\u python=jupyter”和“PYSPARK\u DRIVER\u python\u OPTS=“notebook”启动了jupyter笔记本,我正在连接到jupyter笔记本,但也无法运行“print”命令。当我运行命令时,它将进入下一行,但不显示输出,并且打印不会以彩色突出显示
我已经在我的windows机器(独立模式)中安装了pyspark并在命令提示符下运行,它工作正常……
我正在Pyspark中为现有数据帧创建一个新的coulmn,方法是搜索一个已归档的“script”并返回match作为新列的条目
import re as re
def sw_fix(data_str):
if re.compile(r'gaussian').search(data_str):
cleaned_str = 'gaussian'
elif re.compile(r'gromacs').search(data_str):
我想根据键找到多少不同的值,例如,假设我有
x = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("b", 2), ("a", 2)])
我已经完成了groupByKey的使用
sorted(x.groupByKey().map(lambda x : (x[0], list(x[1]))).collect())
x.groupByKey().mapValues(len).collect()
输出将由like
[('a', [1, 1, 2])
我正在使用pyspark和hivecontext.sql,我想从数据中过滤掉所有空值和空值
因此,我使用简单的sql命令首先过滤掉空值,但它不起作用
我的代码:
hiveContext.sql("select column1 from table where column2 is not null")
但是它在没有表达式“where column2不为null”的情况下工作
错误:
Py4JavaError: An error occurred while calling o577.showS
我是Pypark的新手。我有'Table A'和'Table B',我需要将两者连接起来才能得到'Table C'。有人能帮我吗
我正在使用数据帧
我不知道如何以正确的方式将这些表连接在一起
表A:
+--+----------+-----+
|id|year_month| qt |
+--+----------+-----+
| 1| 2015-05| 190 |
| 2| 2015-06| 390 |
+--+----------+-----+
+---------+--
我如何解决这个问题
rdd.collect() //['3e866d48b59e8ac8aece79597df9fb4c'...]
rdd.toDF() //Can not infer schema for type: <type 'str'>
myschema=StructType([StructField("col1", StringType(),True)])
rdd.toDF(myschema).show()
rdd.collect()/['3e866d48b5
标签: Pyspark
text-miningrandom-forestword2vecspacy
我有一个包含句子和布尔列(0或1)的数据集来分类评论的类型(有毒|严重|有毒|淫秽|威胁|侮辱|身份|仇恨)。
您可以在此处下载数据集:
我用空格过滤单词,只保留有用的单词,我保留了:形容词、副词、动词和名词使用此功能:
def filter_words(words) :
vec = []
conditions = ('ADV','NOUN','ADJ','VERB')
for token in nlp(words):
if not token.is_stop and toke
我对python很有经验,但对pyspark完全陌生。我有一个dataframe,它包含大约50万行,具有几个分类特性。对于每个功能,我都有一个热编码功能。下面是一个简化但具有代表性的代码示例
从pyspark.ml.feature导入StringIndexer,OneHotEncoder
从pyspark.ml导入管道
df = sc.parallelize([
(1, 'grocery'),
(1, 'drinks'),
(1, 'bakery'),
我想使用一个键将多个RDD合并成一个RDD。有没有一种有效的方法来代替多次加入
例如:
Rdd_1 = [(0, a), (1, b), (2, c), (3, d)]
Rdd_2 = [(0, aa), (1, bb), (2, cc), (3, dd)]
Rdd_3 = [(0, aaa), (1, bbb), (2, ccc), (3, ddd)]
我认为输出应该是这样的
Rdd = [(0, a, aa, aaa), (1, b, bb, bbb), (2, c, cc, ccc),
看起来这应该行得通,但我有错误:
mu=平均值DF[输入]
sigma=stddevdf[输入]
dft=df.withColumnoutput,df[输入]-mu/sigma
pyspark.sql.utils.AnalysisException:分组表达式序列为
空,并且''user`'不是聚合函数。包
'将'sumresponse'转换为双平均'sumresponse''/
stddev_sampCAST`sumsresponse`与中的`scaled`'相同
窗口函数或在第一个或第一个值
给定以下数据帧:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test").getOrCreate()
df = spark.createDataFrame([['a',1],['b', 2],['a', 3]], ['category', 'value'])
df.show()
+--
标签: Pyspark
apache-spark-sqlpyspark-sql
我动态生成一个查询字符串,如下所示,并将其传递给selectExpr()
根据文件
选择expr(*expr):
投影一组SQL表达式并返回一个新的数据帧。
这是接受SQL表达式的select()的一个变体
问题在于变量“queryString”被视为单个字符串,而不是三个单独的列(这是正确的)。错误如下:
:org.apache.spark.sql.catalyst.parser.ParseException:
==SQL==
“类别id为类别id”、“类别部门id为类别dpt\U id”、
我通过一个例子来解释我的问题:
假设我们有一个数据帧,如下所示:
original_df = sc.createDataFrame([('x', 10,), ('x', 15,), ('x', 10,), ('x', 25,), ('y', 20,), ('y', 10,), ('y', 20,)], ["key", "price"] )
original_df.show()
输出:
+---+-----+
|key|price|
+---+-----+
| x| 10|
| x|
我有以下代码:
a) 生成本地Spark实例:
# Load data from local machine into dataframe
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Basic").master("local[*]").config("spark.network.timeout","50s").config("spark.executor.heartbeatInterval
标签: Pyspark
apache-spark-sqlaws-glue
我正在AWS胶水上使用PySpark。当我查看我的工作指标时,通常如下所示:
请注意,执行人的数量在一段时间后会下降,这是为什么?而且,洗牌是尖锐的,这是因为数据处理——在获得数据之后,需要很长时间吗?我怀疑我的工作中有些地方不是最优的。它依靠一台机器来完成大部分工作。但它是哪一部分呢?有什么建议吗
我的代码:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
fro
在pyspark中,我会对数据帧执行连续操作,并希望从中间结果中获得输出。但它总是需要相同的时间,我想知道它是否缓存过任何东西?不同的问题是,使用中介结果的最佳实践是什么?在dd.compute(df.amount.max(),df.amount.min())中,将找出需要缓存和计算的内容。pyspark中是否有类似的版本
在下面的示例中,当它到达print()时,它会执行3x吗
df_purchase = spark.read.parquet("s3a:/example/location")[
在pyspark 2.0中按箭头键时,我得到字符。
我在Cent OS上的虚拟机上安装了spark 1.6和spark 2.0。spark 1.6没有问题
由于我在pyspark 2.0中使用Python2.7,“pip2.7安装readline”不起作用。
我试过了
但这并不能解决问题。这些是ANSI转义序列。^[代表转义字符(ASCII 27),下面包括序列的类型和参数。请阅读更多有关它们的信息。head-2/path/to/SPARK version-*/bin/pyspark的输出是什么
标签: Pyspark
random-forestdecision-treeapache-spark-mladaptive-threshold
目前,已在具有以下分布的不平衡数据集上构建了随机森林分类器(来自Spark ML):
+-------+----------------+
| score | n_observations |
+-------+----------------+
| 0 | 256741 |
| 1 | 13913 |
| 2 | 7632 |
| 3 | 15877 |
| 4 |
我正在使用mget参数使用lftp从远程位置复制文件。将50个xml文件从sftp机器复制到本地Unix机器大约需要2分钟。我希望能够复制20k个文件。XML文件约为15kb。dataframe df_文件包含我要复制的所有XML文件的列表
我用20000个文件尝试了下面的代码,似乎需要几个小时才能用这些文件创建一个数据帧
for row in df_files.tolist():
print row
cmd_p1 = """lftp sftp://userna
我试图找到一个解决办法,但一无所获。我在这方面是新手,所以如果你知道解决方案,请帮助我。
谢谢 在Databricks中,您可以使用下面描述的任何一种方法访问存储在ADL中的文件。
有三种访问Azure Data Lake存储Gen2的方法:
使用服务主体和OAuth 2.0将Azure Data Lake Storage Gen2文件系统装载到DBFS
直接使用服务主体
直接使用Azure Data Lake Storage Gen2存储帐户访问密钥
将文件系统中的文件当作本地文件装载和访问的
我使用pysparksql将两个表连接在一起,一个包含带有经度和纬度的犯罪地点数据,另一个包含带有相应经度和纬度的邮政编码
我想弄清楚的是如何统计每个邮政编码内发生的犯罪数量。我是PySpark的新手,我的SQL已经生锈了,所以我不确定自己哪里出了问题
我曾尝试使用COUNTDISTINCT,但这只是给出了不同邮政编码的总数
mySchema = StructType([StructField("Longitude", StringType(),True), StructField("Latit
标签: Pyspark
pyspark-sqlpyspark-dataframes
如何在pyspark中将全名拆分为不同的列
输入CSV:
Name,Marks
Sam Kumar Timberlake,83
Theo Kumar Biber,82
Tom Kumar Perry,86
Xavier Kumar Cruse,87
FirstName,MiddleName,LastName,Marks
Sam,Kumar,Timberlake,83
Theo,Kumar,Biber,82
Tom,Kumar,Perry,86
Xavier,Kumar,Cruse,87
输出
我有一个Pyspark数据框,其中包含一些非唯一的键键,以及一些列数字和值
对于大多数键,数字列从1到12,但对于其中一些键,数字(例如,我们有数字[1,2,5,9])。我想添加缺少的行,这样对于每个键,我们就可以用最后看到的值填充1-12范围内的所有数字
所以这张桌子
key number value
a 1 6
a 2 10
a 5 20
a 9 25
我想去
key
是否有可能更新udf中的列表/变量
让我们考虑一下这个场景:
studentsWithNewId = []
udfChangeStudentId = udf(changeStudentId, IntegerType())
def changeStudentId(studentId):
if condition:
newStudentId = computeNewStudentId() // this function is based on studentsWith
我需要从数据帧中“克隆”或“复制”/“复制”每一行
我什么也没发现,我只知道我需要使用explode
例如:
ID - Name
1 John
2 Maria
3 Charles
输出:
ID - Name
1 John
1 John
2 Maria
2 Maria
3 Charles
3 Charles
谢谢您可以使用数组重复和分解(Spark2.4+)
对于重复的:
from pyspark.sql import f
我有一个10分钟间隔的pyspark数据帧,如何在一个分类特征和2小时的时间内聚合它,然后计算其他两个特征的平均值和第三个特征的第一个值
在pyspark中,我的示例数据如下所示。我想按“ind”和“date”的2小时时间分组,然后计算“sal”的平均值和“imp”的第一个值
from pyspark import SparkContext
from pyspark.sql import SQLContext
sc = SparkContext.getOrCreate()
sqlContext
我有以下rdd
[('K', ' M'),
('K', ' H'),
('M', ' K'),
('M', ' E'),
('H', ' F'),
('B', ' T'),
('B', ' H'),
('E', ' K'),
('E', ' H'),
('F', ' K'),
('F', ' H'),
('F', ' E'),
('A', ' Z')]
我想过滤掉rdd中存在(y,x)的元素(x,y)。在我的示例中,输出应该如下所示:
[(K,M),
(H,F)]
我们可以像下面那样指定编码吗
df.write.format(“text”).option(“encoding”,“UTF-8”).saveasTable您可以---如果您想创建外部表,也可以指定路径
df.write.option("path", "/some/path").option("encoding", "UTF-8").saveAsTable("t")
标签: Pyspark
delta-lakehive-metastore
我有一个关于delta lake的hive metastore支持的问题,
我已经使用以下配置在独立spark会话上定义了元存储
pyspark --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"
我是Pyspark的新手,我有一个包含客户id和文本的数据框,其中包含一个关联的值
+------+-----+------+
|id |text |value |
+------+-----+------+
| 1 | Cat| 5|
| 1 | Dog| 4|
| 2 | Oil| 1|
我想基于关键字列表分析文本列,并创建一列,告诉我关键字是否在文本字段中,并提取相关值,预期结果如下
List_keywords = ["D
我有一个spark数据框,其中我需要根据当前行和前一行基于col_x值的金额总和来计算运行总数。当列中出现负数时,我应该打破以前记录的运行总计,并从当前行开始执行运行总计
样本数据集:
预期输出应如下所示:
如何使用pyspark通过数据帧实现这一点?我希望在实际场景中,您将有一个时间戳列来对数据进行排序,我使用带有zipindex的行号对数据进行排序,作为这里的解释基础
from pyspark.sql.window import Window
import pyspark.sql.fun
因为我是Pyspark的初学者,有人能帮我把整型列转换成字符串吗
这是我在Aws Athena中的代码,我需要将其转换为pyspark数据帧
case when A.[HHs Reach] = 0 or A.[HHs Reach] is null then '0'
when A.[HHs Reach] = 1000000000 then '*'
else cast(A.[HHs Reach] as varchar) end as [HHs Reach
标签: Pyspark
apache-spark-sqlprobabilitypyspark-dataframes
我想通过pyspark中列类型的值计算ratings列中评级('A','B','C')的条件概率,而不进行收集
输入:
company model rating type
0 ford mustang A coupe
1 chevy camaro B coupe
2 ford fiesta C sedan
3 ford focus A
1 2 3 4 5 6 ...
下一页 最后一页 共 34 页