Apache flink 从Elasticsearch读取数据到Flink聚合?

Apache flink 从Elasticsearch读取数据到Flink聚合?,apache-flink,apache-storm,flink-streaming,trident,Apache Flink,Apache Storm,Flink Streaming,Trident,我正在尝试使用Kafka消息(作为StreamSource)更新Elasticsearch中的文档。使用windows和Elasticsearch连接器作为接收器批量写入Elasticsearch是可以的,但是,我们需要更新文档中的现有数据,并以批量执行的方式读取这些数据(不是针对每个元组,而是针对byKey()拆分后要聚合的整个窗口) 我们现在正在使用Storm Trident,它在a之前执行批量读取,在a之后写入更新的聚合,从而最大限度地减少与后端的交互。我只是在Flink中找不到类似的东西

我正在尝试使用Kafka消息(作为StreamSource)更新Elasticsearch中的文档。使用windows和Elasticsearch连接器作为接收器批量写入Elasticsearch是可以的,但是,我们需要更新文档中的现有数据,并以批量执行的方式读取这些数据(不是针对每个元组,而是针对
byKey()
拆分后要聚合的整个窗口)


我们现在正在使用Storm Trident,它在a之前执行批量读取,在a之后写入更新的聚合,从而最大限度地减少与后端的交互。我只是在Flink中找不到类似的东西-有什么提示吗?

在流上运行两个窗口调用怎么样-

window1
-从elasticsearch批量读取

window2
-批量进入elasticsearch

streamData
  .window1(bulkRead and update/join)
  .processFunction(...)
  .window2(BulkPush)
  • 您可以使用任何合适的方法进行批量阅读,如
    Storm Trident
  • 在window2中使用BulkProcessor

    • 在流媒体上运行两个窗口调用如何-

      window1
      -从elasticsearch批量读取

      window2
      -批量进入elasticsearch

      streamData
        .window1(bulkRead and update/join)
        .processFunction(...)
        .window2(BulkPush)
      
      • 您可以使用任何合适的方法进行批量阅读,如
        Storm Trident
      • 在window2中使用BulkProcessor

      谢谢你的回答!这需要为每个窗口加载1,而不是有一个内部缓存,只加载尚未出现的键-但你是对的,我可能需要加载、更新和保存processFunction中的所有内容-它看起来非常手动和非性能wrt数据库访问和大容量读/写。谢谢你的提示!谢谢你的回答!这需要为每个窗口加载1,而不是有一个内部缓存,只加载尚未出现的键-但你是对的,我可能需要加载、更新和保存processFunction中的所有内容-它看起来非常手动和非性能wrt数据库访问和大容量读/写。谢谢你的提示!