Apache nifi 如何为单个流文件指定优先级属性?

我需要在NiFi中使用PriorityAttributePriorizer 我在下面的参考中观察到优先顺序。 如果我收到10个流文件,那么我需要为每个流文件设置唯一的优先级值 在此之后,指定队列配置必须为PriorityAttributePriorizer 然后根据优先级值处理流文件 我如何为单独的流文件设置优先级值,或者Nifi中的哪个优先级设置器适用于我的案例?PriorityAttributePriorizer通过查找名为“priority”的流文件属性并根据优先级的值按字典顺序对流文件

Apache nifi 文件名中的nifi表达式concat文本

我在nifi数据流中创建了一个RouteOnAttribute处理器,我希望它只选择要解析的特定文件。 在属性中,我创建了一个属性“filetofetch” 用下面的表达 ${filename:contains('INCOMING.D151221') 我需要获取文件名INCOMING.D 因此,今天的21 MARTS 2017文件名为 INCOMING.D20170321 我尝试过用类似这样的东西来扩展文件名 ${filename:contains('INCOMING.D'+ ${now(

Apache nifi 如何使用ApacheNIFI处理excel文件?

我想使用nifi使用excel 谁能在这方面指导我就好了 我试过找它,但没有运气 谢谢您使用的是什么版本的NiFi?从1.2.0版开始(感谢),您应该能够使用,然后通过将文件作为CSV进行操作来继续流程。非常感谢。真的很感激

Apache nifi nifi中的JVM内存不足异常

我不能使用任何处理器,因为它会抛出异常,当我想清除连接队列时,它会抛出异常,如下所示:从队列中删除了0个流文件(0字节) 无法删除流文件,因为java.lang.IllegalStateException:无法更新存储库,因为此时所有分区都不可用。写入存储库将导致损坏。这通常是由于存储库磁盘空间不足或JVM内存不足造成的。在这种情况下,听起来好像磁盘已满。您可以使用bootstrap.conf中的Java参数提高分配给运行ApacheNIFI的JVM的堆数量 # JVM memory setti

Apache nifi 如何在ApacheNIFI中创建XDB处理器?

我想通过ApacheNIFI将数据推送到Influxdb中,但默认情况下,nifi中没有Influxdb的处理器。根据我的搜索,我发现可以在ApacheNIFI中创建新的处理器。因此,请帮助我了解如何在ApacheNIFI中制作PutInfluxdb处理器。ApacheNIFI 1.6.0版本包括InfluxDB包。它有xdb处理器。() 有关创建自定义处理器的问题,请查看以下链接: 我正在使用Apache nifi 1.4.0…如何将其更新为Apache nifi 1.6.0@Siva

Apache nifi Nifi validaterecord和convertrecord使用AvroschemareRegistry验证和转换json记录

我需要帮助理解NiFi中ValidateRecord和ConvertRecord处理器的功能 我的要求 我有一个JSON流文件内容,它具有各种属性,其中很少有字段是必需的,很少有字段是可选的,也很少有字段是时间戳字段,很少有双精度类型。在LOV中,有些字段的值应为 我使用了AvroSchemaRegistry和下面的模式定义: { "namespace": "nifi", "name": "test_json", "type": "record", "fields

Apache nifi Nifi处理器未正确解析JSON

我使用EvaluateJsonPath从JSON中提取一个特定值。我使用以下JSONPath表达式: $.data[?(@.containerType == 'SOURCE' && @.path == 'SOURCE_KYLO_DATALAKE')].id 这是我调用JSONPath的JSON文档: {"data":[{"id":"dc18bf87-c5a6-4600-9584-e79fb988b1d0","path":["@Rakesh.Prasad@diageo.com"

Apache nifi NiFi-连接到另一个实例(S2S)

我正在尝试使用SiteToSiteProgence报告任务 目标是在两个停靠的NiFi实例之间发送源数据,一个在8080端口,另一个在9090端口 我在目标NiFi上创造性地创建了一个名为“IN”的输入端口,源NiFi上的服务配置为: 但是,我得到了以下错误: 由于无法与远程NiFi群集通信以确定远程群集中存在哪些节点,因此无法刷新远程组的对等节点 我还公开了目标docker中的端口10000。如评论中所述,容器之间似乎存在网络问题 提问者最终通过不使用容器解决了这个问题。看起来您的两个doc

Apache nifi 流文件的大小是否影响I/O

流文件大小的考虑因素是什么?它是否会影响I/O?如果提供足够的内存,它是否越大越好?因为NIFI会将每个流文件从磁盘保存到内存 特别是对于Kafka消费处理器,它包含一个名为消息标界器的属性。通过设置,一个流文件可以包含多个事件,如果没有设置,一个流文件只能包含一个事件。比如说,如果FlowFile包含1000个json对象而不是1个json对象,性能会更好吗 尤其令我困惑的是这份文件 它表示将json值存储在FlowFile属性中,这意味着它每个FlowFile处理一个json对象,一个Fl

Apache nifi 如何在apachenifi中连接putHQL

我正在尝试连接到hdinsight上的hive。这些是我遵循的步骤 1.创建文本文件并插入create table语句 2.使用getFile processor将其转换为flowfile。此流文件将 输入到puthiveQL处理器 当我尝试执行此数据流时,它抛出错误,如所附图像所示 您是否可以共享处理器配置和试图传递给处理器的流文件内容?您是否可以共享处理器配置和试图传递给处理器的流文件内容?

Apache nifi apachenifi中的批处理流文件

我已经编写了自定义nifi处理器,它尝试批处理输入流文件 然而,它的行为似乎并不像预期的那样。发生的情况如下: 我在服务器上复制粘贴一些文件FethFromServerProcessor从服务器获取这些文件并将其放入queue1MyCustomProcessor从queue1批量读取文件。我在MyCustomProcessor上定义了batchSize属性,在其onTrigger()方法中,我通过执行以下操作从当前批中的queue1获取所有流文件: session.get(context.get

Apache nifi Apache NiFi,将属性写入flowfile内容

是否可以选择在不使用AttributesToJSON处理器的情况下将流文件的属性作为内容写入?取决于内容的格式以及属性的位置 如果您只想在内容中获取一个属性值并替换其中的任何内容,那么替换值为${my.attribute}的ReplaceText将使流文件内容成为my.attribute的值

Apache nifi Nifi中的GetFile错误:目录不存在

我在VM Ubuntu中安装了Nifi映像docker。我尝试使用GetFile processor获取一个xml文件,但是当我使用Ubuntu中创建的特定目录设置输入目录字段时,返回如下错误消息: 根据“/home/john/nifi/inputs”验证的输入目录无效,因为目录不存在 正确的类型路径是这样的/home/john/nifi/inputs还是介于${absolute path}之间 非常感谢使用“Docker exec-it/bin/bash”进入Docker容器 然后“cd~”转

Apache nifi 群集Nifi运行不稳定,问题经常出现

java.net.SocketTimeoutException:读取超时 3节点,Nifi-1.10.0,ZK-3.6.5 我重置了相关设置,使Nifi在给定的时间内响应,如下所示。但这种方式无法工作 nifi.cluster.node.connection.timeout=120 sec nifi.cluster.node.read.timeout=120 sec nifi.zookeeper.connect.timeout=30 secs nifi.zookeeper.session.t

Apache nifi 如何加快NiFi启动时间

我想知道我们是否可以加快NiFi启动时间,因为在大多数情况下,我们需要在机器上本地测试NiFi,但正常启动几乎需要10分钟 非常感谢,如果有人能帮忙的话 我正在使用NiFi 1.8 您使用的是什么nifi版本?在商品笔记本电脑上,nifi的一般构建应在大约18-20秒内启动,而集群应在默认情况下在

Apache nifi 如何在Nifi 1.9.2和Zookeeper 3.x中顺利恢复节点?

Nifi中有3个节点正在运行,但是其中一个突然丢失,日志显示本地流与集群流不同。我想选择以前成功的解决方案,删除失败节点的flow.xml.gz并重新启动它 我不确定故障节点中的数据是否会丢失?非常感谢您的帮助! 为什么节点会丢失?我发布了如下日志 2020-08-20 08:17:33,019 ERROR [Reconnect to Cluster] o.a.nifi.controller.StandardFlowService Handling reconnection request fa

Apache nifi NGINX代理后NiFi的远程进程组未经授权/匿名

我在NGINX反向代理后面运行Nifi。两者都是docker容器。我正在开发一个使用远程进程组的MINIFI流。未来,预计MINIFI将通过代理访问远程进程组。 如果我像这样使用URL,我的远程进程组工作得很好https://nifi:8443/nifi (无代理)。当我尝试将nifi.example.local与端口7443或8443一起使用时,它就会中断。GUI中的远程进程组说 401: Unauthorized with explanation: null. nifi-user.log说

Apache nifi 依赖于前两个处理器的处理器

我有三个PutSQL-处理器。理论上,我可以按简单的顺序执行它们:1-2-3,因为只有第三个处理器需要在前两个处理器之后执行,但是1和2也可以并发运行,因为它们彼此独立 为了加快速度,我想同时运行1和2,并在两个都成功后触发第三个: 1 ---\ ----> 3 2 ---/ 我在文档中找不到任何东西(或者我只是用错误的关键字查找),但是否有一个处理器可以像和操作员一样运行 理想情况下,处理器等待来自1和2的成功消息,然后再将成功消息发送到3在该ca中,您将使用带有通用相关

Apache nifi 在流期间执行NIFI InvokeHTTPProcessor一次,而不是在每个入站流文件的基础上执行

我有一个NIFI流,可以将文件从一个FTP服务器移动到另一个FTP服务器。该流以ListSFTP处理器开始,以PutSFTP处理器结束。使用PUTSFP处理器进行身份验证所需的密码存储在另一个应用程序中,该应用程序公开REST端点以获取密码。我希望获得一次密码,并使用相同的密码将所有获取的文件放入目标SFTP服务器。在这种情况下,请告知我可以在何处/如何使用InvokeHTTP处理器,这样就不会对每个流文件调用它(在每个流文件的基础上获取密码是没有意义的)。创建一个基于时间的并行流,该并行流取决

Apache nifi apache nifi:hive关闭连接,工作流停止工作

我注意到一些工作流停止工作是因为连接池断开了与配置单元的连接,如果我停止并重新启动连接池,则任何工作都将停止。如何避免仅在蜂巢上工作时出现此问题?有人面对过这个问题吗 这是一个错误,DBCP当前假定连接有效,因此它可以在下次处理器运行时返回这样的连接,从而导致您看到的错误 我认为HiveConnectionPool将受益于中的相同修复,也许还有其他DBCP设置,以确保只返回/借用有效连接。我写了一封信来报道这件事 不幸的是,我认为目前唯一的解决办法是重新启动NiFi

Apache nifi 使用时间戳修改流文件

我有从设备到ApacheNIFI处理器的视频数据,我想向这个流文件添加时间戳,并使用PutKafka处理器存储在Kafka中。最好的方法是什么?您可以使用UpdateAttribute处理器,并使用表达式语言${now():toNumber()}将当前时间表示为毫秒,并将其作为属性添加到流文件中。然后可以路由到PutKafka 除了Andy所说的之外,我想提到的是,据我所知,Kafka没有每条消息的标题属性,因此将时间戳+视频传递给Kafka的唯一方法是将时间戳放入有效负载中,然后使其不再是有效

Apache nifi 作为API的有用数据流';s

是他们的任何ApacheNIFI API的,在那里我可以作为数据流重用为API 例如:数据从本地到HDFS(数据流),我需要将其用作标准API吗?它们在ApacheNIFI中是否存在任何可能性)wiki中有一个模板。该模板演示了一个简单的web服务,它在端口8011上向HTTP请求返回“Hello,NiFi!”。您可以修改它以侦听其他端口并使用自定义逻辑进行响应 你能详细说明一下你所说的标准API是什么意思吗?NiFi数据流可以配置为侦听HTTP(S)请求,如果这是您的意思的话。嗨,James,

Apache nifi ApacheNIFI 1.4.0问题和问题

我试图将数据从S3复制到HDFS,观察到了几个问题,并且没有什么问题 问题 Processor ConvertJSONTAVRO-如果流文件不是有效的JSON,那么处理器将陷入无限循环,并出现以下错误 ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] ConvertJSONToAvro[id=c09f4c27-0160-1000-6c29-1a31afc5a8d4] failed to process session due t

Apache nifi Nifi记录最大时间戳

我在每个流文件中都有时间戳值,我需要将当前流文件的时间戳与以前的流文件进行比较,如果使用Put文件处理器将时间戳更大,那么最后我需要在文件中有最大时间戳。我通过以下方式尝试了更新属性处理器的高级功能: 添加了一个名为max\u timestamp的规则,其条件为${getStateValue(“maxTimestamp”):lt(${timestamp})},并将属性maxTimestamp的操作设置为${timestamp} 我还将“设置存储状态”功能设置为在处理器中本地存储,但我没有看到将m

Apache nifi 如何在数据转换模板中增加/更新数据

请告诉我在数据转换模板中添加提交发布高水位线的过程,方法是将水位线值设置为高水位线,就像我们在数据摄取中所做的那样。因为我需要获取增量/更新的数据 我是通过阅读文档得到这个解决方案的,但却不知道如何实际操作。最终,我的需求是在数据转换中获得增量和新数据 如果您对此有意见,请给我回复您的模板需要从LoadHighWaterMark处理器开始。这用于将当前高水位线值放入流文件属性中。初始值为null,直到由ReleaseHighWarkMark处理器更新为止。LoadHighWaterMark处理器

Apache nifi 脚本在终端中运行良好,但为;“未找到”;使用executestream命令运行时

当我登录到终端时,我能够很好地运行一个脚本,但是当在NiFi内部运行相同的脚本时,我从处理器得到一个错误。 我确保拥有脚本的用户/组与NiFi运行的用户相同 ExecuteStreamCommand[id=<id>] Failed to process session due to java.io.IOException: Cannot run program "backup.sh" (in directory "/etc/foo/bar"): error=2, No such fi

Apache nifi 如何使用QueryDatabaseTable获取所选年份记录?

我想使用QueryDatabaseTable处理器获取特定年份记录,如果我在QueryDatabaseTable中使用select*from table where year>2018,我该如何实现。请帮助如果使用自定义查询,它将接受自定义查询并通过将自定义查询的结果集别名为表名来包装它,而不是使用表名和列来返回属性。因此,在您的案例中,生成的SQL的SELECT部分如下所示: SELECT * FROM (select * from table where year > 2018) te

Apache nifi nifi:如何在csv文件中合并多个列?

nifi版本:1.5 输入文件: col1,col2,col3,col4,col5,col6 a,hr,nat,REF,6,2481 a,hr,nat,TDB,6,1845 b,IT,raj,NAV,6,2678 我想将最后三列合并为:delimiter和separator by/based col1 预期产出: col1,col2,col3,col4 a,hr,nat,REF:6:2481/TDB:6:1845 b,IT,raj,NAV:6:2678 我无法找到解决方案,因为很多响应都是基

Apache nifi 如何在nifi中为自定义处理器启用SiteToSiteProgence报告?

我有一个自定义处理器,并启用了用于捕获事件的报告任务 但在SiteToSiteProgence报告任务中不会捕获自定义处理器的事件 抱歉,SiteToSiteProgence不支持所有处理器,那么我从哪里可以获得不支持SiteToSiteProgence报告的处理器列表 我可以为所有处理器启用它吗?默认情况下,站点到站点起源报告任务应收集并传输系统上的所有起源事件。您可以在RT上配置一些属性来控制发送的内容: 事件类型-允许使用逗号分隔的列表筛选收集的事件类型 组件类型-允许筛选使用正则表达式

Apache nifi 如何用NULL替换从ExecuteSQL提取的NIFI属性值

我正在使用ApacheNIFI ExecuteSQL处理器从Oracle数据库中提取数据。我提取了字段X、Y和Z,现在我想将列X的值替换为NULL,并将数据加载到HDFS。 有人能建议在ExecuteSQL processor之后使用哪个处理器来实现这一点,以及必须在处理器中配置哪些更改吗?方法1:(最简单) Select Cast(NULL as String) as X,Y,Z from <schema_name>.<tab_name> //change the sy

Apache nifi 如何在nifi中白名单主机头?

尝试从loadbalancer访问nifi时,在eks群集中部署了nifi,出现以下错误: 系统错误 请求[/nifi]中包含无效的主机头[abc.com] 检查请求操纵或第三方拦截 有效的主机头为[空]或: 127.0.0.1 127.0.0.1:8443 本地服务器 本地主机:8443 ::1 nifi-deployment-59494c46dc-v4kk6 nifi-deployment-59494c46dc-v4kk6:8443 172.35.3.165 172.35.3.165:844

Apache nifi Nifi在单个位置存储Azure Blob帐户\名称/帐户\密钥

我在很多地方都有和。从ducumentation来看,这些处理器支持在属性或值注册表中指定“存储帐户名称”和“存储帐户密钥”。好吧,这两种方法都是不安全的,敏感信息的真正方法是使用参数上下文 因此,我最终在多个处理器上复制粘贴了名称密钥对。考虑到我有3个环境,非常麻烦 是否有更好的方法跨多个处理器共享DeleteAzureBlobStorage/PutAzureBlobStorage配置帐户?是的,这些处理器都有一个属性,该属性是提供Azure凭据的控制器服务。定义控制器服务并填充一次,然后从任

Apache nifi 远程访问ApacheNIFI(v1.11.4)

我正在本地机器上运行ApacheNIFI(版本1.11.4),比如在端口(8090)上。 它在我的机器上运行良好。 这是没有注册表/证书/LDAPS/安全性的正常设置 我想在专用网络中运行的其他计算机上访问此NiFi。 为此,我在我的机器的防火墙上启用了这个8090端口(作为TCP的入站规则) 但无法远程访问NiFi 我测试了这两种情况(使用默认端口和自定义post): 案例1:在我的本地计算机上,NiFi在端口8080上运行,我也在端口8080上设置了防火墙入站规则,其他成员应该能够远程访问端

Apache nifi 使用ApacheNIFI在CSV中转换日期格式

我需要在ApacheNIFI环境中修改CSV文件 我的CSV看起来像文件: Advertiser ID,Campaign Start Date,Campaign End Date,Campaign Name 10730729,1/29/2020 3:00:00 AM,2/20/2020 3:00:00 AM,Nestle 40376079,2/1/2020 3:00:00 AM,4/1/2020 3:00:00 AM,Heinz ... 我想将带有AM/PM值的日期转换为简单日期格式。每行从2

Apache nifi ApacheIgniteNIFI集成

我正在尝试在nifi中创建/使用ApacheIgnite缓存。我使用的是nifi的1.13.2版,找不到PutIgniteCache和GetIgniteCache选项。有人能帮我吗。此版本是否支持ignite? 从1.12.x迁移到1.13.x 已从便利版本中删除以下nar。它们仍然是在maven存储库中构建和提供的,因此您可以将它们添加到部署库文件夹中,并根据需要使用它们。包括:;nifi livy nar,nifi livy控制器服务api nar,nifi-kafka-0-11-nar,n

Apache nifi 在linux机器上运行NiFi

我刚刚下载了linux版的nifi-0.5.1 尝试使用命令运行Nifi时:/Nifi.sh run 我在nifi app.log中遇到以下异常: 2016-03-13 05:29:42959警告[主要] org.apache.nifi.web.server.JettyServer无法启动web服务器。。。 关闭。org.apache.nifi.web.NiFiCoreException:无法启动 流量控制器。 位于org.apache.nifi.web.contextlistener.Appl

Apache nifi ExecutionScript输出两个不同的流文件NIFI

我将executionScript与python一起使用,我有一个数据集,它可能有一些损坏的数据,我的想法是处理好的数据,并将其放在我的flowfile内容中到我的成功关系中,损坏的将它们重定向到失败关系中,我做了如下操作: for msg in messages : try : id = msg['id'] timestamp = msg['time'] value_encoded = msg['data'] hexFram

Apache nifi Apache NiFi-请求包含无效的主机头

我正在我的工作场所使用Docker运行一个ApacheNIFI实例,并安装了一个代理。我将正式容器拉下来,旋转容器,将其设置为端口8081,而不是8080,因为工作代理设置为8080 但是,当我尝试使用http://localhost:8081/nifi出现以下错误 系统错误 请求[/nifi]中包含无效的主机头[localhost:8081]。检查请求操纵或第三方拦截。 我发现网上有几篇帖子提到了nifi.properties文件,但除了旋转图像之外,我对Docker不是很有经验 如果有人能提

Apache nifi nifi将键值对附加到json流文件

如何将键/值对附加到Nifi中的json流文件中。我尝试了replaceText处理器,替换策略为append,但无法将键/值对正确插入到JSON中。我在另一篇文章中提到过这一点,但您需要使用理解JSON的处理器。您可以为此使用UpdateRecord。。。它需要您的模式有一个额外的字段,您可以将该字段设置为null,以便从一开始就可以使用相同的模式,或者你可以拥有模式的第二个副本。UpdateRecord中的读卡器将使用包含10个字段的模式,而编写器将使用包含11个字段的模式。你可以使用替换策

Apache nifi NIFI:将SQL查询中的每月转储作为CSV文件摄取到SFTP服务器中

我希望使用SQL查询将数据存储为各自的每月CSV,并将其存储到SFTP服务器中 例如,我的查询是: select fooId, bar from FooBar where query_date>=20180101 and query_date<20180201 --(for the month of January 2018) 请告知我如何推进这项工作。对于这种情况,我可以想出三种方法 方法1:使用MD5函数执行SQL查询以获取fooId的哈希值: 流量: Genera

Apache nifi Kafka系统日志:NIFI中最高效的工作流?

我实际上在法国的一家大公司工作,我们的目标是通过NIFI接收卡夫卡所有服务器(近1400台服务器)的系统日志(rfc5424格式)。我们选择NIFI是因为我们希望根据找到的appname将日志路由到它们相关的主题 所以我们会有很多小的流文件 实际上,我们遇到了性能限制:我们不能摄入超过5k msg/s的msg,我们希望摄入超过50k msg/s的msg。当然,如果可能,我们希望尽可能多地进行处理 我们有:listenSyslog(批处理大小1+已启用解析)=>RouteOnAttribute(从

Apache nifi NiFi处理器计划程序仅为一个文件运行

NiFi版本1.5 我有一个需求,正在探索调度程序。刚刚注意到,调度程序只处理一个文件,而不考虑文件的存在。比如说, GenerateFlowFile->UpdateAttribute->LogAttribute GenerateFlowFile每10秒运行一次。 UpdateAttribute已计划0/1*1/1*?*每1分钟 我注意到,GenerateFlowFile生成了6个文件。UpdateAttribute只处理6个文件中的一个文件 这里,我希望UpdateAttribute每1分钟运

Apache nifi 无法连接到自定义NiFi端口

我已经在conf文件的属性中将端口配置为9090。以下是我所做的仅有的两项更改: nifi.web.http.host=localhost nifi.web.http.port=9090 当我运行该文件并转到链接时,会出现“本地主机拒绝连接”错误 你知道会出什么问题吗?@Omkar Ingate 我在开发环境中有多个NIFI,包括9090上的一个NIFI。我所做的唯一改变就是端口。我不接触主机,因此配置看起来像: nifi.web.http.host= nifi.web.http.port=

Apache nifi 如何基于内容路由记录\事件

我使用GetFile收集包含JSON记录的文件。每个记录都包含一个参数(我们称之为Sensor),该参数的值为a或B 基于该值,我希望将每个记录路由到不同的输出端口–我应该使用哪个模块?我是否需要拆分记录,或者我可以停留在文件级别 可在文件中找到的记录示例 {"EventTime":"2020-12-07 04:49:00", "Sensor":"A", "Keywords":-9223372

Apache nifi 生成表获取SQL SERVER 2016的问题

我尝试从SQLServer中提取数据,并使用GenerateTableFetch。当我使用MYSQL数据库而不是SQL Server进行相同的生成表获取时,它的工作与预期的一样。每当我使用连接SQL Server时,都会出现如下错误 GenerateTableFetch[id=07bed292-0162-1000-0000-00004bc12345] failed to process session due to java.lang.IllegalArgumentException: Orde

Apache nifi 如何在nifi处理器中获取整个flowfile内容

我正在使用nifi开发数据漂移。在我的流程中使用SelectHiveQL processor。selectHiveQL的输出(流文件)需要放入下一个处理器。 将流文件内容存储到用户定义变量中的合适处理器必须使用Executescript中的相同变量来操作数据。处理器可以通过标准API直接访问传入流文件的内容。以下是一个例子: def flowFile = session.get(); if (flowFile == null) { return; } // This uses a cl

Apache nifi 如何查找GenerateTableFetch创建的所有文件已被处理

我们有一个流程,其中GenerateTableFetch从splitJson获取输入,该JSON将TableName、ColumnName作为参数。同时将多个表作为输入传递给GenerateTableFetch,下一个ExecuteSql执行查询 现在,我想在下一个处理器处理完一个表的所有文件后触发一个新进程(最后是PutFile) 如何找到为一个表创建的所有文件都已处理?您可能需要完成这项工作,在撰写本文时,有一个补丁正在审查中,我希望它能进入NiFi 1.9.0 编辑:同时添加潜在的解决方法

Apache nifi 使用ParquetRecordSetWriter时,NiFi合并记录处理器不符合最小存储箱大小

我正在尝试在NiFi(1.11.4)中构建一个流,该流从AMQ读取Avro消息,使用合并记录处理器将它们累加,然后将合并的拼花文件写入HDFS 问题是,当我试图在合并记录处理器中使用ParquetRecordSetWriter时(与AvroReader一起使用),合并内容永远不会基于最小存储箱大小阈值集发出-我试图设置非常低的值-它根本不起作用。同时,最大仓位年龄阈值工作正常 此外,如果我使用AvroRecordSetWriter,最小大小阈值也可以正常工作。 因此,我尝试使用AvroRecor

Apache nifi 颠簸-对嵌套数组重新排序

我正在尝试使用JOLT对嵌套数组重新排序。我的目标是将位于相同数组位置(i)的所有元素分组,并将它们添加到另一个数组中 输入: { “价值观”:[ [ "84139", "123" ], [ "230", "456" ], [ "230475", "789" ] ] } 预期结果: { “结果”:[//与值[i]相同的长度 [//长度与值相同 "84139", "230", "230475" ], [ "123", "456" "789" ] ] } 信息:请记住,两个数组(根和子数组)的长

  1    2   3   4   5   6  ... 下一页 最后一页 共 19 页