Apache spark 用于hadoop小文件的spark rdd

我有很多很小的xml文件,每个文件有1条记录。我正在使用spark进行一些转换并加载到DB中。当我在sc.WholetextFileArgs[0]中使用本地文件时;工作正常,正如我看到的日志 像 1 record transformed.. 1 record transformed.. 对于每个分区,我都会加载到数据库中,并得到如下日志 2000 records loaded 0 rejected. 但是当从hadoop加载带有sc.wholeTextFilesargs[0]的文件时;其显

Apache spark 我做错了什么?

我正在使用spark处理数据,它可以处理一天的数据(40G),但无法处理一周的数据: import pyspark import datetime import operator sc = pyspark.SparkContext() sqc = pyspark.sql.SQLContext(sc) sc.union([sqc.parquetFile(hour.strftime('.....')) .map(lambda row:(row.id, row.foo))

Apache spark 了解Spark';s缓存

我想了解Spark的缓存是如何工作的 这是我的天真理解,如果我遗漏了什么,请告诉我: val rdd1 = sc.textFile("some data") rdd1.cache() //marks rdd1 as cached val rdd2 = rdd1.filter(...) val rdd3 = rdd1.map(...) rdd2.saveAsTextFile("...") rdd3.saveAsTextFile("...") 在上述情况下,rdd1将仅从磁盘(例如HDFS)加载一

Apache spark 在一组机器上运行Spark

我想在四台计算机上运行Spark,我阅读了使用Mesos、Thread和SSH在集群上运行Spark的理论,但我想要一个实用的方法和教程。这些机器的操作系统是Mac和Ubuntu。我已经使用Scala在IntelliJIDEA上编写了代码 有人能帮我吗?如果有问题要求我们推荐或查找书籍、工具、软件库、教程或其他非现场资源,则会导致堆栈溢出,因为这些问题往往会吸引固执己见的答案和垃圾邮件。相反,请描述问题以及迄今为止为解决该问题所做的工作。Apache Spark页面上有一些不错的文档:

Apache spark Pyspark纱线簇模式

是否有任何方法可以在不使用spark submit脚本的情况下使用纱线集群模式运行pyspark脚本?我需要这样做,因为我将把这段代码集成到django web应用程序中 当我尝试在纱线簇模式下运行任何脚本时,出现以下错误: org.apache.spark.SparkException: Detected yarn-cluster mode, but isn't running on a cluster. Deployment to YARN is not supported directly

Apache spark Spark对非迭代分析仍然有利吗?

Spark使用内存计算和缓存来减少复杂分析的延迟,但这主要用于“迭代算法”, 如果我需要进行更基本的分析,比如说,每个元素都是一组数字,我想寻找标准偏差小于“x”的元素,与常规集群计算(没有内存计算)相比,是否仍然可以减少延迟?假设我在每种情况下都使用相同的商品硬件。它与没有使用这些额外机制的顶级排序框架相匹配,因此我认为这是足够的理由。但是,您也可以运行流媒体、图形或机器学习,而无需切换档位。然后,添加一个选项,在任何可能的情况下都应该使用数据帧,这样就可以得到比我所知道的任何其他框架都要好的

Apache spark 如何将整行传递给UDF-Spark数据帧过滤器

我正在为具有大量内部结构的复杂JSON数据集编写过滤函数。传递单个列太麻烦了 因此,我宣布了以下UDF: val records:DataFrame = = sqlContext.jsonFile("...") def myFilterFunction(r:Row):Boolean=??? sqlc.udf.register("myFilter", (r:Row)=>myFilterFunction(r)) 直觉上我认为它会这样工作: records.filter("myFilter(*

Apache spark 无法从spark beeline查看配置单元orc表的数据

我已经创建了一个orc蜂巢表,如下所示: 创建表forest41 id int,将按id聚集的字符串键入2 存储为orc TBLProperty“transactional”的存储桶=true 在表41中插入值1,“红色”,2,“白色”,3,“黑色” 现在,当我试图从spark beeline查看数据时:它既不向我显示任何数据,也不引发任何异常 以下是我运行的查询: 从default.forest40 limit 10中选择* 但是在spark作业控制台中:它显示了与上述查询相关的一个作业-跳过

Apache spark 如何根据两个条件筛选()pairRDD

如果我有两个过滤条件,一个用来测试密钥,另一个用来测试值(代码的一部分),那么我如何过滤我的RDD对呢 JavaPairRDD filtering = pairRDD1.filter((x,y) -> (x._1.equals(y._1))&&(x._2.equals(y._2))))); 您不能对此使用常规筛选器,因为它一次检查一个项目。您必须将多个项目相互比较,并检查要保留的项目。下面是一个仅保留重复项目的示例: val items = List(1, 2, 5, 6

Apache spark spark数据帧中模式(最常见元素)的聚合

在Spark中,我使用了一个库,我应该为它提供聚合,然后该库执行一系列连接/分组,并在最后调用聚合。我试图避免违反封装(尽管必要时我可以),只需使用聚合调用此方法(传统上是sum或min等) 在本例中,我尝试运行模式,但我不确定如何在聚合中运行该模式。这里有一个Spark(2.1.0)UDAF来计算给定列的统计模式: package org.anish.spark.mostcommonvalue import org.apache.spark.sql.Row import org.apache

Apache spark 使用spark高效过滤有序文件

假设我有一个大的日志文件,其格式如下: Timestamp, text Timestamp, text Timestamp, text Timestamp, text Timestamp, text Timestamp, text 日志文件已按时间戳排序。 如果我以spark作为数据帧读取文件,然后过滤t1和t2之间的时间戳, 然后他检查每个记录的时间戳是否在t1和t2之间,这需要很多时间 但是,有没有一种方法可以告诉spark df已经订购,然后它就会知道它只需要查找第一个和最后一个时间戳,

Apache spark Spark中的RDD样品

RDD示例如何在spark中工作?其不同参数的功能是什么,即样本替换、分数、种子 我在网上找不到任何关于“替换”和“种子”参数的相关信息。请举例说明。分数和种子很容易猜测-它们是您希望在样本中看到的元素的分数,即.5的样本将为您提供包含一半元素的初始RDD样本。种子是随机数生成器种子。这很重要,因为您可能希望能够为测试硬编码相同的种子,以便在测试中始终获得相同的结果,但在prod代码中,将其替换为以毫秒为单位的当前时间或来自良好熵源的随机数 替换抽样是一种谷歌搜索方式,例如。 简言之,如果使用替

Apache spark 如何使用groupBy将行收集到地图中?

上下文 sqlContext.sql(s""" SELECT school_name, name, age FROM my_table """) 询问 根据上表,我想按学校名称分组,并将姓名、年龄收集到Map[String,Int] 例如-伪代码 val df = sqlContext.sql(s""" SELECT school_name, age FROM my_table GROUP BY school_name """) ------------------------ schoo

Apache spark Spark如何处理与时间相关的JDBC数据?

我正在尝试通过每日ETL Spark作业将S3上的Spark数据库与旧的Oracle数据库同步。我试图了解Spark在连接到Oracle之类的RDS以获取数据时会做什么 它是否只抓取Spark向DB发出请求时的数据(即,如果它在2/2 17:00:00从Oracle DB抓取数据,它将只抓取到该时间点的数据)?本质上说,2/2 17:00:01的任何新数据或更新都不会从数据提取中获得?好吧,这取决于具体情况。通常,除非应用程序和数据库设计明确保证,否则您必须假设这种行为是不确定的 默认情况下,S

Apache spark spark jobserver无法使用spark 2.0构建

我正在尝试使用spark-2.0运行spark jobserver 我从github存储库克隆了branch spark-2.0-preview。我遵循部署指南,但当我尝试使用bin/server\u deploy.sh部署服务器时。我发现编译错误: Error: [error] /spark-jobserver/job-server-extras/src/main/java/spark/jobserver/JHiveTestLoaderJob.java:4: cannot find symb

Apache spark 在spark中添加JAR以使用sql UDF

我想使用我的库中定义的自定义自定义自定义项。为此,我使用了以下代码: %spark2 import org.apache.spark.sql.functions.year val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc) sqlContext.sql(s"ADD JAR /usr/hdp/current/spark-client/lib/myLib.jar") val df = sqlContext.sql("se

Apache spark I';I’我试图在创建新文件后立即从目录中的文件中读取数据。“实时”;“文件流式处理”;

我目前正在学习spark流媒体。我试图在创建新文件后立即从目录中的文件读取数据。实时“文件流”。我得到下面的错误。有人能给我一个解决方案吗 import org.apache.spark.SparkContext._ import org.apache.spark.SparkContext import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} object

Apache spark Spark ML:使用ChiSqSelector进行特征选择后在培训中出现问题

我是新手。我正在研究一个分类模型,并希望使用ChiSqSelector选择模型培训的重要功能。但是,当我使用ChiSqSelector选择的功能进行训练时,它会抛出以下错误: “IllegalArgumentException:u”功能0被标记为标称(分类),但它没有指定的值数。” 有趣的是,当我使用任何基于树的算法时,我得到了上面提到的错误。例如,天真的偏见和逻辑回归,我没有得到错误 当我在spark文档中使用示例代码中提供的数据时,我发现了相同的结果。可使用spark 2.1.1文档中的代码

Apache spark 使用数据帧的行间火花余弦距离

我必须计算每行之间的余弦距离,但我不知道如何优雅地使用Spark API数据帧。其思想是计算每行(项目)的相似性,并通过比较行之间的相似性来获取前10个相似性。-->这是项目推荐系统的需要 我读到的所有关于它的内容都涉及到计算列的相似性 也许有人会说,使用PySpark Dataframe的API或RDD可以优雅地计算行之间的余弦距离,还是我必须手动计算 这只是一些代码来显示我打算做什么 def cosineSimilarity(vec1, vec2): return vec1.dot(

Apache spark 在分区内的多个列上进行Spark聚合,而不进行洗牌

我正在尝试在多个列上聚合数据帧。我知道聚合所需的一切都在分区内——也就是说,不需要洗牌,因为聚合的所有数据都是分区的本地数据 如果我有 val sales=sc.parallelize(List( ("West", "Apple", 2.0, 10), ("West", "Apple", 3.0, 15), ("West", "Orange", 5.0, 15), ("South", "Orange", 3

Apache spark SnappyData:在build.sbt和import语句中放置什么以便使用SnappySession

我正在努力开发一种“Hello World”类型的SnappyData应用程序,我希望能够在IntelliJ中构建和运行该应用程序。到目前为止,我的集群是本地机器上的一个定位器、一个lead和一个服务器。我只想连接到它,序列化一点微不足道的数据或者一个数据帧,然后看看它是否正常工作 文件上说我应该可以做这样的事情: val spark: SparkSession = SparkSession .builder() .appName("SnappyTest") .master("xxx

Apache spark 未找到SparkConf类

我试图用spark分析facebook网络。当我创建Spark Conf的对象时,它会给我以下错误。 线程“main”java.lang.NoClassDefFoundError中出现异常:org/apache/spark/SparkConf 我使用的是scala 2.11版,它应该可以正常工作。在我的intelliJ IDEA中,还加载了scala sdk 2.12。 相同的代码为:- import org.apache.spark.SparkConf import org.apache.sp

Apache spark AWS EMR Spark应用程序-CPU和内存利用率低

我正在AWS EMR(3个节点*m4.4X大型集群-每个节点16vCPU和64G RAM)上运行我的Spark流媒体应用程序(Spark 2.2.1、EMR 5.11、Scala)的2个副本 在内置的EMR集群监控(Ganglia)中,我看到集群的CPU利用率低于30%,内存使用量不超过32GB,可用容量约为200GB,网络也远未达到100%。但是,应用程序几乎无法在批处理间隔内完成批处理 以下是我使用客户端模式将应用程序的每个副本提交给Master时使用的参数: --master yarn -

Apache spark 如何计算通过率&;插入spark数据框中的列?

我正在将HDFS中的数据读取到spark数据帧中。根据状态值,我需要在质量列中插入通过/失败/中止的值1/0/-1,或者我们是否有可能计算通过率 df=sparkSession.read.json(hdfsPath) +-----------+---------+ |身份|身份| +-----------+---------+ |Tsz3650419c |通过| |Tsz3650420c |故障| |Tsz3650421c |通过| |TSZ365042C |通过| |Tsz3650423c |

Apache spark 按表达式与2GB限制进行Spark数据帧重新分区

我们都知道spark不支持大于2GB的分区。(发生这种情况时,我们得到java.lang.IllegalArgumentException:Size超过了Integer.MAX_VALUE) 那么,当使用df.repartition(num,$“col”)对表达式进行分区时,我们如何增加分区的数量呢?这里,如果col散列到同一分区,则num的任何值都不会阻止突破2GB限制 数据帧不能像使用RDD那样使用自定义哈希分区器。为了使用Range partitionner做了一些努力,但在这里没有帮助

Apache spark 基于需要外部API调用的现有列创建新Spark dataframe列的最佳方法是什么?

我在一个基于Python的Jupyter笔记本中使用了一个数据帧。我想根据现有列的内容添加一个附加列,其中新列的内容是通过对原始列运行外部API调用派生的 我尝试的解决方案是使用基于Python的UDF。第一个单元格包含如下内容: def analysisold_列: 新列=myapi.analyzetext=旧列 返回新列 分析\u udf=udf分析 第二个单元格是: df2=df1。带Column2,分析 df2.选择“col2”。显示=5 我的数据框相对较大,大约有70000行,其中co

Apache spark 将spark中的标识符切换为假名数据集

我有一个数据帧,如: +---+-----+ | id|value| +---+-----+ | a| 1| | a| 2| | b| 1| | b| 3| +---+-----+ val df = Seq(("a", 1), ("a", 2), ("b", 1), ("b", 3)).toDF("id", "value") 如何有效地切换/旋转ID。注意,这里我不想要散列,我明确地想要旋转标识符。如果没有自连接,如何在spark中高效地实现这一点?可能是一些R

Apache spark Spark dataframe计算行最小值

我试图将几列的最小值放入一个单独的列中。(创建min列)。操作非常简单,但我找不到合适的函数: A B最小值 1.2.1 2.11 31 1 4 1 非常感谢你的帮助 您可以在pyspark中使用该函数: pyspark.sql.functions中的导入最少 df.withColumn('min',least('A','B')).show() #+---+---+---+ #|A | B | min| #+---+---+---+ #| 1| 2| 1| #| 2| 1| 1| #

Apache spark Apache Spark流式处理大量csv文本文件,其标题包含重要信息

我需要将大量的csv文件作为源文件进行流式处理,每个文件都包含一个标题,其中包含用于对后面的其余数据进行分类的重要信息 构建流式解决方案的最佳方法是什么?该解决方案将包含ApacheSpark分布式处理系统下每行的标题数据 问题可能是,在文件处理被拆分的情况下,任何执行器都可以拾取头。我将分离消息创建和消息处理,其中消息表示您想要的头和行的组合 例如,您可以使用Kafka构建此类消息,将它们发布到主题,并在pyspark应用程序中引用该主题来处理它们

Apache spark 将列拆分为多行

我正在使用sparkml,我在CSV中有如下数据。第一个是电影名称,下面的值是收到的各种评级。评级的数量会有所不同 例如。 泰坦尼克号,9,10,8,6,9 冷冻,8,8,8 汽车,6,7,8,5 我想把它们当礼物 泰坦尼克号,9 泰坦尼克号,10 泰坦尼克号,8 泰坦尼克号,6 冷冻,8 冷冻,8 冷冻,8 汽车,6 汽车,7 汽车,8辆 汽车,5辆 你知道怎么做吗。 我查看了Explode功能,但它在分隔符不同的场景中对我有所帮助,比如:泰坦尼克号,9 | 10 | 8 | 6 | 9。这里

Apache spark 为什么我要;spark shell:许可被拒绝”;火花设置错误?

我是Apache Spark的新手。我正在尝试将Apache Spark安装到我的Macbook上。 我从Apache spark官方网站下载文件“spark-2.4.0-bin-hadoop2.7”。 当我尝试运行./bin/spark shell或./bin/pyspark时,我得到了权限被拒绝错误 我只想在本地机器上运行spark。 我还尝试授予所有文件夹的权限,但没有帮助。 为什么我会出现这个错误?这应该可以解决您的问题chmod+x/Users/apple/spark-2.4.0-bi

Apache spark 为什么使用spark';s量化器分组不均匀?

我有一个数据集 使用spark 2.3.1的org.apache.spark.ml.feature.QuantileDiscretizer类对要素列进行分组,得到的模型分组结果不一致 最后一个数据包中反映的数据几乎是其他数据包的两倍,我在参数中设置了11个数据包,但实际上只获得了10个数据包 请看下面的程序 import org.apache.spark.ml.feature.QuantileDiscretizer import org.apache.spark.ml.feature.Bucke

Apache spark 将传感器的数据存储到hdfs中

我正在从事一个项目,涉及使用HDFS存储和Spark计算 我需要将传感器的数据实时存储到HDFS中 例如,我有一个气象站,传感器每5秒生成一次数据(温度压力)。我想知道如何将这些数据实时存储在hdfs中将大量小文件直接写入hdfs可能会产生一些不良影响,因为这会影响主节点内存的使用,并且与批处理相比可能会导致较低的处理速度 您的任何传感器每月都会生成500k文件,因此,除非您的传感器数量非常有限,否则我建议您查看消息代理。ApacheKafka()是众所周知的一个,并且已经捆绑在一些Hadoop

Apache spark 从EMR中的Spark默认类路径中删除JAR

我在一个EMR步骤中执行一个spark submit脚本,该步骤将我的超级JAR作为主类,如 spark-submit \ .... --class ${MY_CLASS} "${SUPER_JAR_S3_PATH}" 。。。等 但是Spark在默认情况下加载jar文件:/usr/lib/Spark/jars/guice-3.0.jar,其中包含com.google.inject.internal.InjectorImpl,这个类也在我的超级jar中的guice-4.x j

Apache spark Spark-Worker没有';我没有足够的资源

我是Apache Spark和Cloudera Manager的新手。我在运行我的第一个Spark应用程序时遇到了一些问题 我的群集中有两个具有RAM和内核的虚拟机: 13.4G/4核作为主节点,节点01 4G/4核作为从节点,节点02 当我跑的时候 su-hdfs-c“spark shell--执行器内存1G--执行器内核1--num executors 1” 它表明 WARN cluster.YarnScheduler:初始作业未接受任何资源;检查集群UI以确保工作人员已注册并拥有足够的资

Apache spark pyspark中的行id等效于什么?

在传统的DWH过程中,我们在传统的RDBMS中基于rowid查找重复记录并跟踪这些重复记录 例如 select pkey_columns, max(rowdid) from table group by pkey_columns 将仅返回与max记录对应的重复记录。即使我们识别重复记录,这也有助于识别/跟踪记录 pySpark中是否有类似的版本?在dwh到pyspark dwh的翻译项目中如何处理这个问题?我建议您使用分析函数库,可能是 ROW_NUMBER() OVER( PARTITION

Apache spark 结构流和状态流中静态数据的周期刷新

我正在尝试使用spark结构化流媒体实现5分钟的批处理监控,其中从kafka读取并查找(1个大的和1个小的)差异静态数据集作为ETL逻辑的一部分,并调用rest API将最终结果发送到外部应用程序(在kafka的数十亿条记录中,只有不到100条记录在ETL后将输出到rest API) 如何在不重新启动整个流应用程序的情况下实现刷新静态查找?(StreamingQueryListener使用StreamingQueryManager.addListener方法拥有我们自己的逻辑,通过Streami

Apache spark 如何为具有多个分区的Kafka主题启动多个使用者?

我最近开始使用spark,并且必须处理一个案例,即我需要在spark中使用Kafka主题的多个分区。如何启动多个消费者?我是否需要使用相同的组id运行同一应用程序的多个实例?或者,在启动应用程序并在内部执行该任务时,我是否可以使用任何配置?传递--num executors,并且每个executor使用多个内核将在Spark中创建多个使用者线程 每个使用者线程都映射到单个分区 使总线程数等于总分区数,以最大限度地提高分布式吞吐量结构化流媒体?@thebluephantom是。当您使用Spark

Apache spark Spark结构化流:连接两个流时出现NullPointerException

当在Spark Structured Streaming 2.4.0中加入两个流时,我们在几个微批次中间歇性地遇到以下异常,作业失败。当我们搜索异常时,它说我们应该将所有可为null的字符串列转换为不可为null的。我们不知道怎么做。 注意:在执行联接操作之前,已对联接列执行Null检查 代码片段 val optionsMap1 = Map[String, String]("Path" -> "/path/to/source1", "maxFilesPerTrigger" -> "

Apache spark 如何判断我使用了哪个应用程序';我在星火中奔跑?

我使用齐柏林飞艇运行Spark查询,我们有Spark应用程序UI 但是,对于每个正在运行的应用程序,我们只有一个UI视图 我让齐柏林飞艇在Hadoop上运行。我想知道我的应用程序ID应该是什么 有没有办法从齐柏林飞艇笔记本中找到这个问题?使用sc.applicationId获取当前作业的应用程序id 使用Spark会话: spark.sparkContext.applicationId 这简直太棒了。关于如何理解齐柏林飞艇应用程序界面,您有什么建议吗?@Cauder,这是Zeppelin

Apache spark Spark如何查看RDD每个部分中的数据

我现在希望自己测试repartition()和coalesce()的行为,特别是在numpartion保持不变的不太常见的情况下,我希望看到使用相同分区号的重新分区调用仍然会对所有数据进行完全洗牌。然后我意识到我缺乏检查每个分区的确切内容的方法。我只是使用一个瘫痪列表作为我的RDD示例。是否有任何方法可以检查每个分区的内容,以便验证我的疑问? 哦,也许还有其他更新的API可以满足这个目标? 提前感谢。您可以使用 返回通过将每个分区内的所有元素合并到一个数组中而创建的RDD 例如,可以使用以下方法

Apache spark 在Spark数据框中将字符串数据类型列转换为MapType

我有一个如下所示的数据帧。我想将最后一列Trandata从String类型转换为MapType。输出应该与我在第二个表中显示的类似 我已经编写了udf,但它需要字符串并转换为Maptype,我很难用sql.row作为输入获得类似的输出:( 对于Spark 2.4+,可以将字符串转换为键值对,然后使用将键和值分隔为两个数组列,然后使用创建最终映射 df.withColumn(“entry”,split('TRANDATA,,')) .withColumn(“key”,expr(“transform

Apache spark 了解pyspark中的重新分区行为

我理解重新分配的行为。我读到,repartition(“colname”)返回200个分区,作为spark.sql.shuffle.partitions设置为200 在我的数据集中,country列有两个不同的值,但在使用df.repartition(“country”)重新分区数据集之后,我得到了一个分区。我正在运行这个示例。有人能帮我理解这个吗

  1    2   3   4   5   6  ... 下一页 最后一页 共 364 页