Apache spark Spark Streaming未将任务分发到群集上的节点

我有两个节点的独立集群用于spark流处理。下面是我的示例代码,它演示了我正在执行的过程 sparkConf.setMaster("spark://rsplws224:7077") val ssc=new StreamingContext() println(ssc.sparkContext.master) val inDStream = ssc.receiverStream //batch of 500 ms as i would like to have 1 sec latency v

Apache spark ';单元格';对象没有属性';iteritems';

我正在通过Spark的python API运行一个简单的示例: x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) def f(x): return x def add(a, b): return a + str(b) sorted(x.combineByKey(str, add, add).collect()) 在本地模式(Spark 1.0和1.1)下没有问题,但在群集模式下会发生错误。下面给出了一条问题回溯信息。在测试RDD函数cogro

Apache spark 使用JMS的Spark流-无API

是否有任何API/方法将Spark Streaming与JMS集成在一起。我能够与Kafka和Sockets集成,但无法与Jms队列或主题集成。我认为您应该尝试在spark中调用Receiver api。您需要创建自定义接收器 同时检查来自tathagat das的依靠,他是spark的贡献者 www.apache-spark-user-list.1001560.n3.nabble.com/spark-Streaming-and-JMS-td5371.html 如果您需要详细帮助,请告诉我我知

Apache spark 如何判断RDD是否将加载到ram?;

下面是一个例子 lines = spark.textFile("hdfs://...") errors = lines.filter(_.startsWith("ERROR")) errors.persist() 论文系统:“请注意,基本RDD行没有加载到RAM中。这是可取的,因为 错误消息可能只是数据的一小部分(小到足以放入 内存)“ 我的问题是如何判断RDD是否将加载到ram中?您的问题不正确。RDD将加载到RAM中,如果您告诉引擎这样做,引擎不会抱怨您的代码。要使它只需将RDD标记为使用

Apache spark 无法使用Google Compute Engine上的Spark连接到主机

我正在谷歌计算引擎中通过“启动点击部署软件”功能尝试hadoop/spark集群 我已经创建了1个主节点和2个从节点,我可以在集群上启动spark shell,但是当我想启动spark shell时,我失败了 我宣布: ./bin/spark-shell --master spark://IP or Hostname:7077 我有一个stackTrace: 15/04/09 10:58:06 INFO AppClient$ClientActor: Connecting to master a

Apache spark Apache Spark广播变量不被重用

我在使用广播变量时会遇到一种奇怪的行为。每次使用广播变量时,每个节点都会复制一次内容,并且永远不会重复使用 下面是spark shell中的一个示例--master local[32]: (当然,这是无用且愚蠢的代码,但它确实显示了行为) 在我的例子中,我广播的变量是几百兆字节,由数百万个小对象(很少的哈希映射和向量)组成 每次我在使用它的RDD上运行一个操作时,我都会浪费几GB的内存,垃圾收集器越来越成为瓶颈 是设计为每次执行新闭包都重新广播变量,还是一个bug,我应该重用我的副本 为什么使用

Apache spark Spark从键值RDD获取键

如果我有一个具有键值对的RDD,并且我只想获取关键部分,那么最有效的方法是什么?非常简单yourRDD.keys() 类似地,您可以通过youRDD.values() 有关此和其他RDD转换和操作,请参见示例

Apache spark 简单火花应用中的错误

我正在运行一个简单的spark应用程序,它执行“word to vector”。这是我的代码(来自spark网站) import org.apache.spark_ 导入org.apache.spark.rdd_ 导入org.apache.spark.SparkContext_ 导入org.apache.spark.mllib.feature.{Word2Vec,Word2VecModel} 对象SimpleApp{ def main(参数:数组[字符串]){ val conf=new Spar

Apache spark Spark:为什么任务只分配给一名员工?

我是Apache Spark的新手,尝试在集群上运行一个简单的程序。问题是驱动程序将所有任务分配给一个工作者 我在两台计算机上以spark单机群集模式运行: 1-使用4个核运行主核和辅助核:1个用于主核,3个用于辅助核。Ip:192.168.1.101 2-仅运行具有4个内核的辅助进程:全部为辅助进程。Ip:192.168.1.104 代码如下: public static void main(String[] args) { SparkConf conf = new SparkConf

Apache spark “为什么火花会抛掷?”;SparkException:数据流尚未初始化";何时从检查点恢复?

我正在从HDFS检查点(例如ConstantInputDSTream)还原一个流,但我一直得到SparkException:尚未初始化 从检查点恢复时,我是否需要执行某些特定操作 我可以看到它想要设置DStream.zeroTime但是当流恢复时zeroTime是null。它无法还原,可能是因为它是IDK的私有成员。我可以看到还原流引用的StreamingContext确实有一个zeroTime值 initialize是一个私有方法,在StreamingContext.graph.start处调

Apache spark 通过amazon EMR上的spark submit添加postgresql jar

我尝试过spark submit,使用--driver类路径,使用--jars,也尝试过这个方法 在命令行中使用SPARK_类路径 SPARK_CLASSPATH=/home/hadoop/pg_jars/postgresql-9.4.1208.jre7.jar pyspark 我得到这个错误 Found both spark.executor.extraClassPath and SPARK_CLASSPATH. Use only the former. 但我无法添加它。如何添加postg

Apache spark 为已部署的Apache Spark提供配置单元支持

我需要在Spark SQL中使用特定于配置单元的功能,但是我必须使用已经部署的ApacheSpark实例,不幸的是,该实例中没有编译的配置单元支持 我需要做什么才能为我的工作提供蜂巢支持 我尝试使用该设置,但总是出现以下异常: 持久性:创建org.DataNucleus.properties.CorePropertyValidator类型的验证程序时出错 类“”的ClassLoaderResolver在创建时出错:{1} 及 org.datanucleus.exceptions.NucleusU

Apache spark Spark submit在使用配置单元表时抛出错误

我有一个奇怪的错误,我正在尝试将数据写入配置单元,它在spark shell中运行良好,但当我使用spark submit时,它抛出了默认错误中未找到的数据库/表 下面是我试图在spark submit中编写的代码,我使用的是spark 2.0.0的定制版本 val sqlContext = new org.apache.spark.sql.SQLContext(sc) sqlContext.table("spark_schema.iris_ori") 下面是我正在使用的命令 /home/e

Apache spark 从Jupyter/pyspark中确定Spark UI端口

我正在运行多台Jupyter笔记本电脑,每台都有自己的pyspark(Spark 1.6.1)内核。如何发现哪个Spark UI实例属于哪个内核?显然,我可以循环浏览打开的页面(,等等)并尝试解决它,但我希望有一种编程的方法来解决它 编辑:我没有对服务器的root访问权限…您可以使用nmap实用程序执行以下操作: sudo nmap -O sparky 它将显示sparky主机上所有打开的端口。这很难看,但它可以工作 import urllib sparkHost = "sparky" fo

Apache spark 确保hbase上的数据位置

是否有办法确保spark executors与Hbase区域服务器位于同一位置?horton works的Spark on HBase中提到如下: 我们假设Spark和HBase部署在同一个集群中,Spark执行器与区域服务器位于同一位置 有没有办法做到这一点? 如果我使用 sparkContext.newHadoopApi() 它能确保数据的局部性吗?我们在Splice Machine的经验是,对于在Spark中运行许多分析查询和压缩的大型查询和系统,我们可以实现适当的局部性(95%)。我们

Apache spark HiveException:未能创建spark客户端

1) 我创建了一个sql文件,我们在其中从两个不同的配置单元表收集数据,并插入到一个配置单元表中 2) 我们正在使用shell脚本调用此SQL文件 3) 样品火花设置: SET hive.execution.engine=spark; SET spark.master=yarn-cluster; SET spark.app.name="ABC_${hiveconf:PRC_DT}_${hiveconf:JOB_ID}"; --SET spark.driver.memory=8g; --SET s

Apache spark Spark增加纱线模式中的执行器数量

我在一个4节点集群上运行Spark over纱线。节点中每台机器的配置为128GB内存,每个节点24核CPU。我使用此命令运行Spark on spark-shell --master yarn --num-executors 19 --executor-memory 18g --executor-cores 4 --driver-memory 4g 但是Spark最多只能启动16个执行器。我将纱线中的最大vcore分配设置为80(在我拥有的94个芯线中)。因此,我的印象是,这将启动19个执行

Apache spark Apache Spark,sbt/assembly上出现错误

我试着在我本地的LinuxMint上安装ApacheSpark,但当我安装时 sbt组件 我得到如下错误: 如何解决这个问题 请从spark二进制文件夹中的图片中给出建议 这意味着您不需要安装它,只需运行bin/sparkshell、bin/sparksubmit即可使用它 如果您想编译它,需要下载源代码我在spark文件夹上运行了bin/spark shell,但出现如下错误:cst@cst-Ideapad-Z460~/spark-2.0.0-bin-hadoop2.7$bin/spark

Apache spark spark sql流mqtt错误的用户或密码

我试图将mqtt作为ApacheSpark中的流使用,使用的库是ApacheBahir spark sql流mqtt。 该库使用paho mqtt库 我使用的lib如下所示: val spark = SparkSession .builder .appName("MQTTStreamWordCount") .master("local[4]") .getOrCreate() import spark.implicits._ // Create DataFrame represe

Apache spark Insert overwrite语句在spark sql中的运行速度比在配置单元客户端中慢得多

spark版本:2.0.0 配置单元版本:2.0.1 我发现在spark sql或spark shell中运行的insert OVERRIDE语句比在hive客户端(我在apache-hive-2.0.1-bin/bin/hive中启动)中花费的时间要多得多,spark大约需要10分钟,而hive客户端只需要不到20秒。 这些是我采取的步骤 测试sql是: INSERT overwrite TABLE login4game partition(pt='mix_en',dt='2016-10-21

Apache spark Spark:不断从Cassandra读取数据

我已经通过和通过和链接 公平地说,Cassandra Spark integration目前没有提供任何现成的东西来不断地从Cassandra获取更新并将其流式传输到其他系统(如HDFS)上吗 我所说的连续,是指仅获取自Spark上次获取以来已更改(插入或更新)的表中的行。如果有太多这样的行,应该有一个选项来限制行的数量,并且后续的spark获取应该从它停止的地方开始。至少保证一次是可以的,但只要保证一次就可以了 如果不支持它,支持它的一种方法是在每个需要由storm查询的cassandra表中

Apache spark spark standalone是否支持Oozie工作流

我是工作流引擎的新手,我需要做一些工作。因此,我考虑使用ApacheOozie,我使用SparkStandalone作为集群管理器 但我阅读的大多数文档都只讨论了纱线上的oozie。我的问题是“spark standalone是否支持并推荐oozie工作流?” 如果是的话,你能分享一个同样的例子吗?或者,我也想知道使用fork 在spark中,不使用任何工作流引擎。除了cron,调度作业的行业标准方法是什么 Oozie的全部目的是在Hadoop集群上调度Hadoop作业。看起来不太适合你。您更喜

Apache spark 使用空值将JSON文本字段转换为RDD

我有一个cassandra表,有一个名为table的字段 [应用程序、ts、数据] 我在cassandra表的数据字段中有几个JSON对象,我需要独立地计算每个对象 val conf = new SparkConf().setAppName("datamonth") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val rdd = sc.cassandraTable[(String, String, Str

Apache spark 无法导入org.apache.spark.streaming.kafka010

我正在制作一个Spark(2.2.0)流媒体项目,在Intelij IDEA 2017.2上使用Scala 2.11.11、SBT 0.13.15和Kafka 0.10.2.1。到目前为止,我已经能够导入其他流媒体模块,如org.apache.spark.streaming.StreamingContext和org.apache.spark.streaming.Seconds,但由于某些原因,我的项目无法识别org.apache.spark.streaming.kafka010.\uquot/c

Apache spark 从Spark ETL重置BigQuery表

我有个问题要问你。如果我有一个内置的ETL Databricks,它正在将数据加载到BigQuery中,但我希望在每次运行ETL之前删除BigQuery表,这可能吗?对不起,我是新手!谢谢 加载数据时,在configuration.load property of jobs.insert下有两个属性(以及许多其他属性)可用于控制加载到的表的内容和方式: [可选]指定目标表已存在时发生的操作 支持以下值: WRITE_TRUNCATE:如果表已经存在,BigQuery将覆盖表数据。 WRITE_

Apache spark Spark数据帧映射函数

我试图在数据帧的每一行上运行一个函数(在流媒体中)。此函数将包含scala代码和Spark dataframe api代码的组合。例如,我想从df中获取3个特性,并使用它们过滤第二个名为df2的数据帧。我的理解是,UDF无法实现这一点。现在我所有的过滤代码都工作得很好,没有能力将其应用到df的每一行 我的目标是能够做像这样的事情 val df1 = Seq(("Brian", 29, "0-A-1234")).toDF("name", "age", "client-ID") val df2 =

Apache spark 如何在Spark SQL中将多个列分解为行

我正在使用Spark SQL 2.2.0和DataFrame/DataSet API 我需要分解几列,每行一列 我有: +------+------+------+------+------+ |col1 | col2 | col3 | col4 | col5| +------+------+------+------+------+ |瓦尔11 |瓦尔21 |瓦尔31 |瓦尔41 |瓦尔51| |瓦尔12 |瓦尔22 |瓦尔32 |瓦尔42 |瓦尔52| +------+------+----

Apache spark 火花修剪镶木地板柱

我知道拼花地板只支持阅读用户选择的列。但是当我使用 dataframe.read.parquet().select(col*) 要读取数据,看起来仍然要读取整个文件。spark中有没有办法只读取选定的列块?“看起来仍在读取整个文件”-您是如何确定的?另外,请包括数据结构和Spark版本。在Spark web UI中,输入大小/读取IO是文件大小的总数。Spark版本为2.1.0.cloudera1,表是TPC-H中的行项目

Apache spark 如何知道我的数据是倾斜的?

将数据(假设表)传输到HDFS后,我不知道如何复制数据(哪个部分到哪个机器(节点)) 所以,运行sparksql查询有人说您可以向Spark提示我的数据是倾斜的 但我怎么知道我的数据是倾斜的,这样我就可以给spark提示了 当您在任何分布式系统(如HDFS)中加载数据时,您可以使用w.r.t分区键加载数据。在这种情况下,如果某个分区键值的记录数比其他值多,则数据分布不均匀。大多数情况下,即使数据稍微倾斜,spark也能很好地工作,但例如,在一个分区值中,您拥有90%的数据,而另一个分区值拥有10

Apache spark 谷歌星火';由于java.io.FileNotFoundException:/hadoop/thread/nm local dir/usercache/root/appcache,s Dataproc失败/

我已经通过齐柏林飞艇和Dataproc控制台在Dataproc上使用Spark/Hadoop好几个月了,但就在最近,我遇到了以下错误 Caused by: java.io.FileNotFoundException: /hadoop/yarn/nm-local-dir/usercache/root/appcache/application_1530998908050_0001/blockmgr-9d6a2308-0d52-40f5-8ef3-0abce2083a9c/21/temp_shuffl

Apache spark 如何在使用spark数据帧写入时自动计算numRepartition

当我试图将dataframe写入Hive拼花地板分区表时 df.write.partitionBy("key").mode("append").format("hive").saveAsTable("db.table") 它将在HDFS中创建大量块,每个块只有较小的数据大小 我理解它是如何进行的,因为每个spark子任务将创建一个块,然后向其中写入数据 我也明白,块的数量会提高Hadoop的性能,但在达到阈值后也会降低性能 如果我想自动设置numPartition,有人有好主意吗 numPar

Apache spark 在ApacheSpark中解析/查询异构CSV数据的最佳方法?

我有大量的日志数据,它们是半结构化的CSV数据。但是,每一行的列取决于它是什么类型的行(由特定列指示) 示例数据: 8/01/2018, person, 1, Bob, Loblaw 32 8/01/2018, person, 2, Roger, McRoger, 55 8/03/2018, dog, Bella, 9, 1 8/05/2018, person, 3, Charlie, McCharles, 23 8/07/2018, dog, Scout, 5, 3 此特定示例显示了一个半

Apache spark 空列的Spark Explode返回空行

我是Spark编程新手。我正在尝试用空行分解数据帧的列。简单来说,我认为explode函数可以为数组中的每个元素创建额外的行,但结果不同 我无法理解爆炸数据帧背后的逻辑。 有人能解释一下下面的例子吗。 我想了解这一结果的基本原理/原因。 为什么空数组在数据帧中被视为null //inputDataFrame +---+------+----------+ |age| name|occupation| +---+------+----------+ | []|Harish| developer|

Apache spark 如何在spark中替换漏掉的换行符

我有一个csv,没有被引用,我在下面添加了一个例子 新行转义为\,如第2行所示,是否有方法使用apache spark将其替换为其他字符 输入CSV Banana,23,Male,5,11,2017 Cat,32,Fe\ male,2,11,2017 Dragon,28,Male,1,11,2017 预期产出 Banana,23,Male,5,11,2017 Cat,32,Fe-male,2,11,2017 Dragon,28,Male,1,11,2017 注意:原始文件很大(大约40GB)

Apache spark 使用UDF计算两个向量之间的距离是否效率低下?

我在Spark中实现了一个分类算法,该算法涉及计算实例之间的距离。该实现使用数据帧(在可能的情况下使用原始SQL)。我将实例的特征转换为一个向量,这样我就可以应用一个定标器,并最终得到一个统一的模式,而不管我的数据集碰巧有多少个特征 据我所知,Spark SQL无法使用向量列进行计算。因此,为了计算实例之间的距离,我必须定义一个python函数并将其注册为UDF。但我看到了反对使用UDF的警告,因为数据帧引擎“无法优化UDF” 我的问题是: 在SQL中无法计算两个特征向量之间的距离(不使用UD

Apache spark 与配置单元相比,spark sql读取表的速度非常慢

我有一个配置单元表,大约有2500列,当我通过spark sql阅读它时,如下所示: val df = spark.sql("select * from dbName.tableName") 完成对配置单元表的读取大约需要3个小时,而我使用配置单元sql读取此表时,它只需几秒钟就可以完成 谁知道为什么spark sql和hive sql的性能如此不同?非常感谢 假设您使用的是Hive而不是Impala:Hive QL查询select*from table甚至不是一个仅映射的作业,而是一个边缘案

Apache spark Sparkyr spark_应用非常缓慢

Sparkyr spark_apply非常慢/根本没有响应。签入spark UI时,正在执行的阶段在utils.scala:204处收集。它正在执行0/1(1个正在运行)任务。应用spark_apply的数据帧有30个分区。此任务没有进展,为什么执行单个任务 library(sparklyr) library(dplyr) config=spark_config() config=c(config, list("spark.files"="hdfs:///bundle/packages.tar"

Apache spark 从拼花文件中获取单个示例行的最有效方法

我需要能够从以拼花格式存储的多个数据集中获取单个样本行。我不知道拼花文件是如何生成的(即,使用什么键优化存储)。此操作需要尽可能快速高效。问题在于,这些数据集中有许多大小为terrabytes,并被分割成数千个拼花地板文件。如果可能的话,我需要避免将整个数据集读入内存,而只是为了得到一行数据。 我知道spark方法,比如使用limit和take,但这些方法涉及通过加载整个数据集并限制返回值来创建spark dataframe/dataset。因此,我的问题是,是否有一种有效的方法来执行此任务,或

Apache spark Elasticsearch如何利用集群?

将单节点群集更改为多节点群集后,您是否需要采取任何措施来利用群集 例如,我们的应用服务器使用单个ip/port(我们单个节点的ip/port)来搜索文档。 此外,logstash、kibana、spark都指向单个ip/端口 我应该放置一个负载平衡器来利用多节点吗?或者我可以免费得到一些东西吗 我遵循以下docker文件(只是将网络模式更改为主机,并部署在多台主机上) 在Elasticsearch中,您不需要做任何特殊的事情来利用并行化。当集群中有一个包含多个分片的索引时,Elasticsear

Apache spark 将dataframe转换为rdd并丢失标题列

我尝试了下面的方法,但是标题列在数据之后,理想情况下应该是第一列 有人能帮我吗 val header = sc.parallelize(Seq(df.columns.mkString("|"))) val data = df.map(_.mkString("|")) val final = header.union(data) final.coalesce(1).saveAsTextFile("path") 为什么不直接将数据帧写入文件 使用以下代码将管道分隔数据帧写入文件: mydf.

Apache spark pyspark py4j创建java字符串数组

我想从Pyspark调用FAT jar的main方法 以下是jar(Scala)主要方法的入口点: 为了调用上述方法,我需要使用pyspark的py4j创建一个数组[String]: str_array = sc._jvm.java.lang.reflect.Array.newInstance(sc._jvm.java.lang.String, 3) str_array[0] = "228" loaded_class = sc._jvm.java.lang.Thread.currentThre

Apache spark 无法在spark sql中注册自定义项

我试图注册我的UDF函数,并希望在我的spark sql查询中使用它,但无法注册我的UDF,我遇到以下错误 val squared = (s: Column) => { concat(substring(s,4,2),year(to_date(from_unixtime(unix_timestamp(s,"dd-MM-yyyy"))))) } squared: org.apache.spark.sql.Column => org.apache.spa

Apache spark 如何配置纱线以分配最少数量的容器?

我在一个纱线簇上并行运行多个Spark作业。我发现Thread同时启动了许多这样的工作,但只为驱动程序分配了一个容器,没有执行器。这意味着这些Spark作业实际上处于空闲状态,等待执行器加入,而通过将执行器分配到其他作业,可以更好地利用这种处理能力 我想将Thread配置为至少为一个作业分配两个容器(一个驱动程序+一个执行器),如果没有,则将其保留在队列中。如何以这种方式配置纱线 (我在AWS EMR群集上运行,几乎具有所有默认设置。)如果您的纱线使用,您可以限制并发运行的应用程序的数量,以及A

Apache spark Spark Streaming使套接字数据流没有更多数据,无法在初始化时连接到所需端口

我也经历过很多类似的情况,大多数认可的答案是,端口必须打开才能启动连接,因此spark无法连接。我已确保端口侦听文件在spark streaming文件之前运行。有人能帮忙吗 端口打开文件 import tweepy from tweepy import OAuthHandler from tweepy import Stream from tweepy.streaming import StreamListener import socket import json consumer_key

Apache spark 为什么在将聚合数据帧写入文件接收器时出现异常?

我正在流数据帧上执行聚合,并尝试将结果写入输出目录。但我有一个例外说 pyspark.sql.utils.AnalysisException: 'Data source json does not support Update output mode; 我在“完成”输出模式下也遇到了类似的错误 这是我的代码: grouped_df = logs_df.groupBy('host', 'timestamp').agg(count('host').alias('total_count')) r

Apache spark 在数据块中像%[A-Za-z]%这样的过滤器

我试图在Databricks笔记本中使用类似“%[A-Za-z]”的table.column,但它不返回任何值 它在SQLServer中工作,但在Pysql中似乎不工作 有人知道Databricks的替代方案吗?功能有限,因此您需要使用: 更新:真实示例: %python df=spark.createDataFrame([{'id':1,'s':'12323'},{'id':1,'s':'123T23'}), schema='id int,s string') df.createOrRepla

  1    2   3   4   5   6  ... 下一页 最后一页 共 174 页