Flink 0.10.0最近刚刚发布。我需要从0.9.1迁移一些代码。但出现以下错误:
org.apache.flink.api.common.functions.InvalidTypesException:无法确定“class fi.aalto.dmg.frame.FlinkPairWorkloadOperator”中类型变量“K”的类型。这很可能是类型擦除问题。当前,类型提取仅在返回类型中的所有变量都可以从输入类型推断的情况下支持具有泛型变量的类型
代码如下:
公共类FlinkPairWor
我想通过HTTP协议将数据流的计算结果发送给其他服务。我看到了两种可能的实施方法:
在接收器中使用同步Apache HttpClient客户端
公共类SyncHttpSink扩展了RichSink函数{
私有静态最终字符串URL=”http://httpbin.org/post";
私有可关闭httpClient httpClient;
私有直方图httpStatusesAccumulator;
@凌驾
公共void open(配置参数)引发异常{
httpClient=HttpClients.
我是阿帕奇·弗林克的新手。我想创建一个数据流,并将另一个系统的值提供给它
我找到了如何添加“SourceFunctions”的示例,在该函数中,我必须等待来自源的值,并通过调用ctx.collect将这些值发布到Flink,然后再次等待,这是轮询
但我有一个数据源,它在值到达时调用函数(异步)。所以,我想做的是:当这个异步调用发生时,我想把值放到Flink数据流中,伪代码:
mysystem.connect_到_值((value)=>{myflinkdatastream.put(value.to
如果您注册一个表,并且其中一个字段是映射(extra,在本例中),则以下语句可以正常工作:
SELECT f1, f2 FROM customers WHERE extra['sportPrefs'] = 'Football';
现在尝试引用地图中不存在的密钥
SELECT f1, f2 FROM customers WHERE extra['moviePrefs'] = 'Action';
您将获得一个NPE,作业将退出。如果有一种方法可以检查映射中是否存在某个特定的密钥,这将是正常的。不
当我尝试向Flink 1.5-SNAPSHOT提交作业时,它会抛出异常
org.apache.flink.shade.netty4.io.netty.handler.codec.TooLongFrameException:HTTP内容长度超过10485760字节。我可以在日志中看到rhis异常。我试图增加akka.framesize的值,并将其设置为104857600b,但它仍然抛出错误
有什么建议吗?谢谢。看起来可能是应该遵循的正确问题Flink 1.5-SNAPSHOT不是Flink的发布版
据我所知,如果内存出现压力,RocksDBStateBend会将数据溢出到磁盘,但我找不到溢出操作发生在哪里,有人能帮我找到这段源代码吗 Flink的RocksDBState后端用于存储状态。RocksDB是一种基于磁盘的嵌入式键值存储。Flink的状态后端只是使用RocksDB的API进行写入和读取,RocksDB将数据放在磁盘上。您要查找的代码在RocksDB中,而不是在Flink中。非常感谢,现在就知道了。
我正在使用ApacheBeam和Flink runner以及JavaSDK。似乎将作业部署到Flink意味着构建一个80兆字节的胖jar,并上传到Flink作业管理器。
有没有一种方法可以轻松部署轻量级SQL来运行Beam SQL?也许部署了一个作业,可以让它如何获取和运行特殊查询?如果我理解你的问题,我认为目前不可能。目前,Beam SDK将始终构建一个fat jar,该jar将实现管道并包含所有管道依赖项,并且它将无法接受轻量级的即席查询
如果您对更多的互动体验感兴趣,请查看正在进行的使Be
我正在试用一个带有本地Flink群集的小程序,按照说明进行设置。示例wordcount程序运行良好,但当我尝试运行自己的程序时,它会在连接到作业管理器时暂停并失败。这是Flink1.5和JDK1.8
守则的有关部分如下:
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setStreaming(true);
options.setFlinkMaster("l
使用ApacheFlink1.4.2,我得到以下异常
com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace:
data (org.apache.flink.api.java.utils.ParameterTool)
parameterTool (com.test.event.configuration.JobConfiguration)
config
现在,我的flink代码正在处理一个文件,并用1个分区将kafka主题上的数据转储
现在我有一个关于2个分区的主题,我希望flink代码使用DefaultPartitioner在这2个分区上接收数据
你能帮我吗
以下是我当前代码的代码片段:
DataStream<String> speStream = inputStream..map(new MapFunction<Row, String>(){....}
Properties props = Producer.getP
我有一个特定的监控系统,在每个受监控的服务器上本地保存数千个文件(没有HDF)。我想用flink来查询这些文件。如果我在每台机器上创建一个worker节点,它们查询一个特定的文件,那么主节点如何知道将此任务发送到相关文件所在的节点?我的退休计划之一是最小化网络流量,避免在节点之间移动数据。有什么方法可以“暗示”它吗?我不知道你说的“…使用Flink查询文件”是什么意思。您可以创建一个知道如何读取本地文件的自定义文件,然后使用Flink对其进行解析/处理,并通过公开结果。如果此流中没有分区,那么所
我对flink(以及拼花地板/hadoop)非常陌生,所以我肯定在做一些非常愚蠢的事情。我正在尝试创建一个接收器,它将把我的数据源转储到拼花地板文件中
我的代码如下所示:
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
streamEnv.setParallelism(1);
streamEnv.enableCheckpointing(100, CheckpointingMode.EXACTLY_ONCE);
让我们假设一个设置,其中我们有一个或多个操作符连接到一个键控流,并且我们期望每个键有许多键和重要的分区状态(就大小而言)
是否有可能影响Flink将调度每个操作员实例的任务经理?在有n个任务管理器和m个操作员实例的设置中,最坏的情况是,这些m个实例中的多个(如果不是全部的话)最终只占用了n个任务管理器中的几个,因此耗尽了可用的资源。目前,Flink对每个任务和插槽都一视同仁。这意味着每个任务都应该可以部署到每个插槽。这不仅大大简化了调度逻辑,而且还通过指定不可满足的调度约束防止了用户自食其果。成
我正在构建一个Flink流媒体系统,可以处理实时数据和历史数据。所有数据都来自同一来源,然后在中拆分为历史数据和实时数据。实时数据得到时间戳和水印,而历史数据按顺序接收。在实时流被窗口化后,两个流被联合并流入同一个处理管道
我在任何地方都找不到EventTime流媒体环境中的所有记录是否需要加时间戳,或者Flink是否可以同时处理实时数据和历史数据的混合。这是一个可行的方法,还是会造成我经验不足而看不到的问题?对数据顺序有什么影响
我们有这样的设置,允许我们做部分回填。每个流都由一个id设置密钥
我的用例非常简单,我接收到包含“事件时间戳”的事件,并希望它们基于事件时间进行聚合。输出为每10分钟一次的周期性加工时间翻转窗口
更具体地说,键入的数据流需要计算7秒的计数
一秒钟的滚动窗口
一种滑动窗口,可提前1秒计数7秒
windowall可每1s输出所有计数
我无法对其进行集成测试(即,类似于单元测试,但是端到端测试),因为输入具有假事件时间,不会触发
这是我的片段
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
我正在尝试在流中进行一些验证,我正在检查无效的卡号,有人问我是否可以保留这些无效的卡号以备将来验证
在ApacheFlink上实现这一点的最佳方法是什么
谢谢好的,如果您希望能够重新启动作业并保留数据,那么我建议使用检查点的Flink状态。我不知道确切的用例,所以我无法判断您应该使用KeyedState还是Operator状态。但基本上,这个想法是将卡号或用于验证的任何东西保持在状态,然后使用savepoint取消作业,无论何时您想要再次启动它,都可以从给定的savepoint开始。这样,您将永
我从四个运动流中读取数据。每个流中的数据都是不同的数据类型。在中读取所有四个流之后,我分配时间戳和水印,并聚合每个流中的数据。四个聚合的结果都是使用相同的泛型对象输出的。我希望合并来自四个流的结果,以便将合并后的流发送到一个ProcessFunction。这本质上允许我像使用协处理函数一样使用ProcessFunction,但我能够处理来自两个以上流的数据(在这种情况下,ProcessFunction将接收来自所有四个单独流的聚合)
然而,我担心的是,这可能不适合水印。如果一个流的处理时间较长或
我这次的问题是:使用MapState时,可以安全地使用MapState.putkey,value将密钥的当前值修改为MapState,或者我需要执行MapState.removekey,然后再执行MapState.putkey,value,或者是否仍要更新此值
从Flink开始,状态抽象不是为并发访问而设计的,不应该在多个线程之间共享。那么,重新表述我的问题:我是否可以根据一个键将值更新为mapState,而不删除该键,然后再次放置该键?如何使用mapState避免ConcurrentModif
我想将下面的代码翻译成pyflink,然后在pyflink-shell.sh中运行它
public class MapDemo {
private static int index = 1;
public static void main(String[] args) throws Exception {
//1.获取执行环境配置信息
StreamExecutionEnvironment env = StreamExecutionEnvironme
目前,Flink可以将数据以ORC格式直接写入hive的hdfs文件,但需要每小时将分区插入hive表。有没有办法每小时触发一个函数?当然,您可以拥有一个KeyedProcessFunction和一个每小时触发一次的计时器。或者,您可以编写一个实现ProcessingTimeCallback的自定义接收器(或者扩展用于HIVE的接收器来实现此功能?)。您还可以实现一个每小时发出一次事件的自定义源
代码将在所有任务管理器中运行,对吗?我需要运行分区添加一次。对。因此,要么在并行度设置为1的运算符
UDF的整个scala项目如下所示:
我注册udf的操作如下:
①mvn scala:compile package
②cp table_api-1.0-SNAPSHOT.jar $FLINK_HOME/lib
③add the following sentence into $FLINK_HOME/conf/flink-conf.yaml
flink.execution.jars: $FLINK_HOME/lib/table_api-1.0-SNAPSHOT.jar
④create temp
我正在使用Flink 1.12,并试图通过Kubernetes集群(AKS)将job manager保留在HA中。我正在运行2个作业管理器和2个任务管理器吊舱
我面临的问题是任务经理无法找到jobmanager负责人
原因是他们试图为jobmanager(一种集群服务)提供K8“服务”,而不是访问领导者的pod IP。因此,有时jobmanager服务将解决对备用jobmanager的注册调用,这使得TaskManager无法找到jobmanager负责人
以下是jobmanager领导文件的内
我是弗林克的新手。我有一个问题,如果发送到下游节点的所有消息都是有序的?比如说,
[Stream] -> [DownStream]
Stream: [1,2,3,4,5,6,7,8,9]
Downstream get [3,2,1,4,5,7,6,8,9]
如果是这样的话,如果我们想处理好这个案件,我们该如何处理
任何帮助都将不胜感激 操作员可以有多个输入通道。它将按照接收顺序处理来自每个通道的事件。(操作员也可以有多个输出通道。)
如果您的作业在流和下游之间有多条路径,那么事件可以
我在官方指南中看到了以下示例
public class BoundedOutOfOrdernessGenerator implements WatermarkGenerator<MyEvent> {
private final long maxOutOfOrderness = 3500;
private long currentMaxTimestamp;
@Override
public void onEvent(MyEvent event,
我看到一些示例,它们将Flink表对象转换为数据流,并运行StreamExecutionEnvironment.execute
如何编写并运行一个连续查询,在不转换为数据流的情况下使用表API将数据写入流接收器
这似乎是可能的,否则指定流式接收器表连接器的目的是什么
TableAPI文档列出了连续查询和动态表,然而大多数实际的JavaAPI和代码示例似乎只使用TableAPI进行批处理
编辑:为了向David Anderson展示我的尝试,下面是在类似的Derby SQL表之上的三条Flink
我在家看书
它说:
作为接收器,upsert kafka连接器可以使用changelog流。
它将在数据之后写入INSERT/UPDATE_作为正常的卡夫卡消息值,
并将删除数据写入带有空值的Kafka消息(指示
钥匙的墓碑)
它并没有提到,若在信息写入upsert kafka之前更新_,那个么会发生什么
在同一链接()中,文档提供了一个完整的示例:
INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*),
COUN
谁能解释分组数据集上combineGroup和reduceGroup转换之间运行的不同行为
ExecutionEnvironment ExecutionEnvironment=ExecutionEnvironment.getExecutionEnvironment();
数据集输入=executionEnvironment.fromElements(1,2,2,3,4,5,6,7,8,9);
input.map(新的MapFunction(){
公共元组1映射(整数值)引发异常{
返回新的Tup
我想使用大小为2的FIFO队列来存储数据流的元素。在任何情况下,我都需要流中的前一个元素,而不是当前元素。为此,我在流代码之外创建了一个队列,并将当前元素加入队列。当我的队列有两个元素时,我将其出列并使用第一个元素
我面临的问题是,由于队列在流代码之外声明,因此无法将其排队。我猜这是因为流使用多个JVM,而我的队列将在一个JVM中声明
下面是一个示例代码:
val queue = Queue[Array[Double]]() //Global Queue
val ws = dataStream
我发现问题在于maven的代码库与flink运行版本不同。一旦我确保所有库都更新到flink 1.1.2,就没有问题了。尝试将jobmanager.rpc.address设置为0.0.0,因为默认情况下它是。因此,作业管理器似乎在默认情况下只接受来自localhost/127.0.0.1的连接。但有一个问题:通过路径Job Manager/Configuration/jobmanager.rpc.address,您在Flink Web UI中看到了什么IP?
package org.myorg.
我有两条溪流:
测量
WhoMeasured(关于谁进行测量的元数据)
这些是它们的案例类:
case class Measurement(var value: Int, var who_measured_id: Int)
case class WhoMeasured(var who_measured_id: Int, var name: String)
测量流包含大量数据。WhoMeasured流几乎没有可用性。事实上,对于WhoMeasured流中的每个who\u measured\u
是否存在导出描述flink群集服务状态的json文件的URL
即系统正常运行时间、作业状态、节点数等。您肯定应该看看监控REST API
可以访问该功能的文档
您还可以访问一些TaskManager指标,遗憾的是,文档中尚未描述这些指标,但您可以查看源代码:您肯定应该查看监控REST API
可以访问该功能的文档
您还可以访问一些TaskManager指标,遗憾的是,文档中尚未描述这些指标,但您可以查看源代码:
我创建了一个HA Flink v1.2集群,由1个JobManager和2个TaskManager组成,每个都位于自己的VM中(不使用Thread或hdfs)。
在JobManager节点上启动作业后,我将杀死一个TaskManager实例。在Web仪表板中,我可以立即看到作业被取消,然后失败。如果我查看日志:
03/06/2017 16:23:50 Flat Map(1/2) switched to DEPLOYING
03/06/2017 16:23:50 Flat Map(2/2) sw
从Eclipse运行Flink程序时出现以下异常:
Caused by: java.io.IOException: Insufficient number of network buffers: required 8, but only 4 available. The total number of network buffers is currently set to 2048. You can increase this number by setting the configuratio
我不确定如何进行调试,我怀疑这些问题可能是在EMR上运行的副作用。据我所知,EMR控制台中的大多数监控指标都是无用的。如果有关系的话,我将运行该程序作为EMR中的一个步骤,我遵循的指南如下http://docs.aws.amazon.com/emr/latest/ReleaseGuide/emr-flink.html。该程序也被认为是一个始终向上的解决方案,基本上它将不断地从卡夫卡流中读取数据并处理数据(如果这有任何关系,不确定是否有一个不同的配置我应该用于始终向上的解决方案)
我很乐意提供任何
我们正在使用多个卡夫卡主题,但希望优先考虑其中一些主题(~服务质量)
根据我在网上发现的情况,大家的共识是不要限制操作符,而是限制源代码,更具体地说是反序列化程序[1]
我们如何访问源中有关流媒体环境状态的信息(即主题落后于当前偏移量的程度)
目前,我们计划将整个设置转换为CoFlatMaps[2],并拥有一个控制流,该控制流为所有主题发出当前偏移滞后-低优先级流操作符,然后根据高优先级流的滞后进行休眠
你将如何解决这个问题?
Tl;dr:是否有办法跨taskmanager的源/反序列化程序共享
我在下面的URL中得到了一个CEP示例
“此练习的目标是为出租车出行发出开始事件,但在出行的前两个小时内没有与结束事件相匹配。”
然而,从下面的代码中,似乎可以找到一种模式,即发现游乐设施已在2小时内完成,而不是在2小时内完成
看起来该模式首先查找开始事件,然后查找结束事件(!ride.isStart),并在2小时内完成,所以它不解释为查找在2小时内完成的骑乘的模式吗
Pattern<TaxiRide, TaxiRide> completedRides =
例如
我有两个名为ds1和ds2的DataStream,DataStream ds3=ds1.union(ds2)。然后我想知道如何从ds3获得ds1.f2和ds2.f2的值
谢谢。Flink中的流并集与Multiset上的并集操作相同——您只需获得一个更大的流,就可以从两个输入流中获得所有元素
所以,换句话说,联盟不是一个联合体。对于其中一个流中的某个元组,ds3.f2是以前为ds1.f2或ds2.f2的值
根据您要完成的任务,您可以向每个元组添加第五个元素,以便了解其来源。或者您可能更愿意使
正如上面所说的,TwoPhaseCommitSinkFunction是在Flink 1.4.0中引入的,目的是实现端到端的一次语义访问。我对这个抽象类TwoPhaseCommitSinkFunction及其子类FlinkKafkaProducer011(源代码是和)有两个问题
TwoPhaseCommitSinkFunction有一个abort方法来中止事务。那么,调用abort方法的情况是什么?具体来说,它会在提交过程开始后被调用吗?这意味着放弃当前事务,并且只调用一次?我这样问是因为,正如评
我正在处理一个简单的案例,我们将流1(头寸)与流2(价格)连接起来,并将最新的头寸数据与价格数据链接起来。
为此,我扩展了一个RichCoFlatMapFunction,它构建了一个包装器对象,用于收集来自任一流的数据。
在此过程中,它还将数据存储在MapState中
在一天结束时,根据另一个流数据(例如日期更改流),我需要清除状态。
我该怎么做?
基本上我需要清除priceState和positionState。我不确定我们是否能得到一个广播流来做这件事
下面是连接两个流的示例代码
stati
我有以下测试:
testHarness.processElement2(new StreamRecord<>(element1));
testHarness.processElement1(new StreamRecord<>(new Tuple2<>(id, element2)));
testHarness.setProcessingTime(1); //let's assume it's the correct time for the timer in
从以前的经验来看,弗林克似乎只完成了一次
成功预提交后,必须保证提交成功
最终成功
我认为“成功的预承诺”是由Flink Task Manager实现的;“最终的成功”是通过弗林克水槽实现的
Flink sink节点如何实现“最终成功”
这一机制是否与检查点有关
Flink的两阶段提交接收器通常通过以下方式将其操作与检查点机制耦合:
onSnapshot:刷新所有记录并预提交
onCheckpointComplete:提交挂起的事务并发布数据
onRecovery:检查并提交任何挂起的事务
我正在实现一个流式应用程序,其中一个有状态的操作员正在尝试捕获“owner has items”关系。每个所有者键入的状态由所有者和每个项目的详细信息组成。项目的所有权可以更改,我希望能够将每个项目与其正确的所有者关联。由于不同所有者的操作员状态可能位于不同的子任务中,并且这些子任务旨在独立运行,因此我想知道共享状态的最佳方式是什么。我能够想到的一个解决方案是从子任务的侧面输出创建一个键控数据流,并将其发送给正确的所有者,然后从原始所有者清除状态。基本上:
子任务K1,状态为关于具有Item1、
我有一个方法def进程(row:org.apache.flink.types.row,fieldName:String):Unit=???
我要进行的处理要求我提取一个名为fieldName的字段。我可以看到RowAPI允许您按顺序提取,但不允许按名称提取
我猜用户必须向我传递模式信息,因此该方法看起来像def进程(row:org.apache.flink.types.row,rowSchema:??,fieldName:String):Unit=??
我已经研究了org.apache.flin
我使用getSideOutput创建一个side输出流,在使用getSideOutput进行处理之前,在预处理流中存在元素,但是当调用getSideOutput方法时,不会发出任何元素
代码如下
DataStream<String> asyncTable =
join3
.flatMap(new ExtractList())
.process( // detect code using for test
我是弗林克的新手。我正在尝试在我的应用程序中启用检查点和有状态。我看到了我们如何从Flink文档中存储键控状态。但我想知道我们是否可以存储非键控状态(ProcessFunction的状态)需要非键控状态有些不寻常,但确实存在
在用户代码中,这通常仅用于实现自定义源和汇,这就是为什么示例重点关注这些用例。但是在ProcessFunction中,您也可以执行相同的操作,即实现CheckpointedFunction接口(即initializeState和snapshotState方法)
非键控状态的
我想写一个数据源,它是来自TarantoolJava的数据流
有谁能给我一个关于如何按用户定义编写数据源的指南吗
这是我的代码:
package tarantooljava.streaming.flink_connecter;
import org.apache.flink.configuration.Configuration;
import org.tarantool.TarantoolConnection16;
import org.tarantool.TarantoolConnect
我想运行流媒体作业。当我尝试使用start clusted.sh和Flink Web界面在本地运行它时,我没有问题。
然而,我目前正在尝试使用Flink on Thread运行我的工作
(部署在Google Dataproc上)当我尝试取消它时
取消状态将永远持续,并且插槽中仍有一个插槽被占用
任务经理
这是我得到的日志:
2016-10-18 16:56:04,053 INFO org.apache.flink.runtime.taskmanager.Task -
Attempting to
我正在尝试将我的应用程序从flink流处理转换为flink批处理
对于flink数据流,我从一个包含多个JSON对象的预定义文件中读取字符串,并将JSON对象平面映射到tuple3收集器(第一个元素——JSON对象的一个字段,第二个元素——JSON对象的另一个字段,第三个元素——实际的JSON对象数据)
DataStream transformedSource=source.flatMap(新的flatMap函数(){
@凌驾
公共void flatMap(字符串值,收集器输出)引发异常{
Ob
我正在寻找一个解决方案,如何在执行过程中更改Flink中的源函数:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SourceFunction<String> mySource = ...; // this a function that I want to change during runtime;
DataStream<String> s
首先,我对流处理非常陌生,请随意纠正我对概念的误解:)
我正在使用ApacheFink。
我的源代码是一个FlinkKafkaConsumer,它已经添加了从卡夫卡获取的时间戳。
在我的处理过程中,我希望能够使用水印(为什么超出了这个问题的范围)
我想要的是抽象类boundedAutofordernessTimestampExtractor提供的水印生成行为
但该类仅提供:
公共抽象长时间戳(T元素)
如果覆盖它,它将为您提供元素,但不是最初由FlinkKafkaConsumer提供的时间戳
由
1 2 3 4 5 6 ...
下一页 最后一页 共 38 页