Google cloud dataflow DataflowPipelineRunner的Dataflow stock WordCount示例在Maven之外运行时失败

我能够使用文档中显示的maven exec:java命令使用DataflowPipelineRunner成功运行WordCount示例 然而,当我试图在我自己的1.8虚拟机中运行它时,它不起作用。我正在使用这些参数(在Windows上): 我得到以下错误: 2014-12-24T04:53:34.849Z: (5eada047929dcead): Workflow failed. Causes: (5eada047929dce2e): There was a problem creating t

Google cloud dataflow 云数据流中并发作业的最大数量

并发数据流作业的数量是否有限制? 我试图提交大约40个小作业以并行运行,但在7个作业之后,开始获得429个速率限制超出错误 谢谢, 是的,每个项目的并发作业数量是有限制的 在alpha中,我们目前支持10个并发作业。如果您希望在alpha期间增加此限制,请联系[谷歌网站]上的数据流反馈。如官方文件所述 每个云平台项目最多可以运行25个并发数据流作业 数据流服务目前仅限于处理10MB或更小的作业请求。作业请求的大小与管道的JSON表示形式密切相关;更大的管道意味着更大的请求

Google cloud dataflow 如何在数据流中创建用户定义的计数器?

如何在DoFns中创建自己的计数器 在我的DoFn中,我希望在处理记录时,每次满足条件时都增加一个计数器。我希望此计数器对所有记录的值求和。您可以使用,计数器的总值将显示在UI中 下面是一个示例,我在一个管道中试验了聚合器,该管道只让numOutputShards工作人员休眠数秒。(开始时的GenFakeInput PTransform只返回大小为numoutShard的平坦PCollection): PCollection输出=p .apply(新的GenFakeInput(options.g

Google cloud dataflow 谷歌云数据流PubSubIO不';不要从一个完整的话题开始阅读

我试图在Google云数据流中以“流”模式运行一个管道。 管道应该从PubSub主题中读取,但是在管道启动后我删除它、重新创建它并将所有消息重新发布到该主题之前,它实际上不会从该主题中读取 有没有办法让管道读取已发布的消息?听起来,提供发布/订阅(更多详细信息请参见)可以解决您的问题。创建订阅后,消息将被缓冲,允许在管道启动时读取这些消息。听起来提供发布/订阅(中的更多详细信息)可以解决您的问题。订阅创建后,消息将被缓冲,允许在管道启动时读取这些消息。请使用云控制台在发布订阅中创建自定义订阅。

Google cloud dataflow 数据流的错误报告

当数据流中发生错误时,是否有方法发送通知 有没有可能把它和类似的东西结合起来 或者使用Stackdriver或谷歌云的错误报告服务?我相信通过以下配置是可能的: 数据流日志被传输到谷歌云日志中 在Google Cloud Logging中创建自定义的基于日志的度量 在Google Cloud Monitoring(Stackdriver)中根据自定义的基于日志的度量创建警报策略 是的,我认为这是目前唯一的办法。我无法让它工作,所以我向谷歌提出了一份支持票,他们告诉我,创建基于日志的指标只适用于

Google cloud dataflow 超过8分钟阅读1 GB侧输入视图(列表)

我们有一个1GB的列表,它是在beam sdk 2.0上使用View.asList()方法创建的。我们正在尝试遍历列表中的每个成员,目前,对其不做任何有意义的事情(我们只是求和一个值)。仅仅读取这个1GB列表就需要8分钟(这是在我们设置workerCacheMb=8000之后,我们认为这意味着工作缓存是8GB)。(如果我们不将workerCacheMB设置为8000,则需要超过50分钟才能结束作业。)。我们使用的是n1-standard-32实例,它应该有足够的RAM。只有一个线程读取此8GB列

Google cloud dataflow 使用PubsubIO的Java OutOfMemoryError

我正在用Java编写一个简单的数据流管道: PubsubIO->ConvertToTableRowDoFn->BigQueryIO 管道正在工作——数据按预期到达BigQuery——但我在数据流工作日志中看到OutOfMemoryErrors 我尝试过的一个实验是通过添加Thread.sleep(100)来降低ConvertToTableRowDoFn的速度。我原以为这会使BigQueryIO的批量变小,但令我惊讶的是,这会使OutOfMemoryErrors更频繁 这让我觉得PubsubIO中

Google cloud dataflow 如何集成测试写入Bigtable的数据流管道?

据报道, 通常,在您的系统上执行本地单元测试更快、更简单 管道代码,而不是调试管道的远程执行 我想为我的Beam/Dataflow应用程序使用测试驱动开发,该应用程序因此写入Bigtable 然而,在Beam测试文档之后,我陷入了一个僵局——PAssert没有用处,因为输出PCollection包含org.apache.hadoop.hbase.client.Put对象,这些对象不会覆盖equals方法 我也不想让PCollection对它们进行验证,因为 直接获取PCollection的内容是

Google cloud dataflow 从dataflow到BigQuery的流式发布/订阅的首选数据格式是什么?

我们的过程目前有点笨拙,我们从数据库获得成批的CSV输出,这些输出被转换成json并流式传输到发布/订阅 这很麻烦,因为json中的每个元素都是字符串格式,当我们尝试写入bigquery时,它会失败,除非Java中有类型转换 是否有任何首选的类型化平面文件格式可用于小批量,以便在使用pub/sub传输时,将类型信息保留在记录级别?取决于管道的设置方式 一般来说,PubsubIO有几种读/写消息的方法: 读取Avros有效负载的消息并解析对象 对Protobuf负载的消息执行相同的操作 为您提供

Google cloud dataflow Beam.io.WriteToPubSub抛出错误“;给定的pcollpone[WriteToPubSub/Write/NativeWrite.None]不是dict、iterable或PCollection";

每当我使用“WriteToPubSub”时,就会出现一个错误。下面的代码是我试图调试该问题的代码。我的实际代码试图从WriteToBigQuery的失败中获取数据,以便将其推送到死信发布子主题。但当我尝试这样做时,我不断遇到下面的错误。 我正在运行ApacheBeam2.27和Python3.8 import apache_beam as beam from apache_beam.runners.interactive.interactive_runner import Interactive

Google cloud dataflow 有状态DoFn的状态是否可以随TTL过期?还是无限增长可以?

我在ApacheBeam(在Dataflow中运行)中遇到了这样一种情况,我基于创建了一个简单的有状态DoFn。上游窗口是全局的,改变它会影响下游聚合 目前,我并没有做任何事情来缩小这个州,它似乎只是变得无限大。这是真的吗?无界状态增长是一个问题吗 我只想将TTL附加到状态,但看不到此功能 我正在考虑在数据上存储自己的时间戳,并使用计时器定期清理表。这样做明智吗 存储的数据是某些事件数据上的缓存键。缓存键告诉我,我需要查找此事件的过去事件数据以生成当前事件。有状态的DoFn在这方面工作得很好,但

Google cloud dataflow 谷歌云数据流可以在没有外部IP地址的情况下运行吗?

我们在扩大使用谷歌云数据流时注意到,我们将开始在使用中的IP地址上超过我们的配额。我们知道,请求配额增加是一种选择(并且已经这样做了,并且已经被批准用于CPU),但我们想知道是否有可能(或将要)在没有外部IP地址的情况下运行数据流实例(出于配额原因和IP地址的成本).目前无法在没有外部IP地址的情况下运行Google云数据流实例。如果你需要额外的工人,你应该要求增加配额。现在可以了。从: 要关闭公共IP,请执行以下操作: 为您的网络或子网络启用专用Google访问 在云数据流作业的参数中,指定-

Google cloud dataflow Write-它是否附加或替换输出文件(谷歌云数据流)

我找不到关于它的任何文档,所以我想知道如果输出文件已经存在(在gs://bucket中),会发生什么行为 谢谢, G文件将被覆盖。这有几个动机: “类似于报告”的用例(计算输入数据的摘要并将结果放在GCS上)似乎比增量生成数据并在每次执行管道时将更多数据放在GCS上的用例要频繁得多 如果重新运行管道是幂等(-ish?)的,这是很好的。例如,如果您在管道中发现一个bug,您可以修复它并重新运行它,然后享受覆盖的正确结果。在这种情况下,附加到文件的管道将很难使用 不需要为TextIO.Write指

Google cloud dataflow 如何使用Python SDK调试自定义接收器

我在使用Python SDK编写自定义接收器时遇到问题。 为了调试它,我使用了logging.info或print方法,但它不能很好地工作。 有什么好方法可以调试自定义接收器吗?您在日志记录方面遇到了什么问题 当使用Dataflow pipeline runner运行时,应该可以在云控制台中访问日志,如中所述 您还可以使用direct runner在本地执行管道,这可能会使调试更容易。在beam和data flow之外独立运行python代码,以获得预期的输入

Google cloud dataflow 谷歌云数据流实例的图像

当我运行Dataflow作业时,它会将我的小程序包(setup.py或requirements.txt)上传到Dataflow实例上运行 但是数据流实例上实际运行的是什么?我最近收到了一个stacktrace: File "/usr/lib/python2.7/httplib.py", line 1073, in _send_request self.endheaders(body) File "/usr/lib/python2.7/httplib.py", line 1035, in

Google cloud dataflow 使用outputWithTimestamp时,数据流管道正在处理过程中丢弃事件

我有一个云数据流管道,我在其中更改事件的原始时间戳,以模拟事件延迟到达的真实场景。然而,在管道的每次运行中,我似乎都会减少一定比例的事件。在DoFn中,我使用以下代码更改时间戳: Instant newTimestamp = originalTimestamp.minus(Duration.standardMinutes(RANDOM.nextInt(15))); c.outputWithTimestamp(KV.of(Integer.toString(RANDOM.nextInt(100)),

Google cloud dataflow 具有相同密钥的多个CoGroupByKey apache beam

我需要将管道中的主数据流(1.5TB)连接到两个不同的数据集(4.92GB和17.35GB)。我用于为这两个对象执行CoGroupByKey的键是相同的。有没有办法避免在第一次完成后重新洗牌联接的左侧?目前,我只是将输出保留为KV>。这似乎比在第一次连接后逐段发送每个元素要好,但第二个groupByKey似乎花费的时间比我预期的要长得多。我打算开始研究拆分CoGroupByKey,看看是否可以忽略分组的一侧,但我真的觉得在这一点上不降低到那个级别更安全 在处理主输入时,您是否考虑过访问较小的数据

Google cloud dataflow 将无界集合写入GCS

我见过许多关于同一主题的问题。但是,我在给地面军事系统写信方面仍然有问题。我正在读pubsub的主题,并试图将其推广到GCS。我提到过。但是,在最新的beam软件包中找不到IOChannelUtils PCollection<String> details = pipeline .apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic")); PCollectio

Google cloud dataflow 数据流发布/订阅应用方法的问题

我是dataflow的新手,尝试使用PubsubIO.writeStrings()to(“projects/market place sql/topics/emisions”)的apply方法时,我正在尝试向pub/sub写一些消息,它说: []该错误只是类型不匹配。管道类的apply()方法接受类型为pttransform的第二个参数

Google cloud dataflow 谷歌数据流可以使用现有的虚拟机而不是临时创建的虚拟机吗?

与标题相同,Dataflow是否可以使用临时创建的VM实例而不是已经创建的实例?在询问OP请求的原因后,我将提供以下潜在答案: 数据流背后的功能是在处理数据管道时实现高度并行。原始请求的背景故事是,当作为本地运行程序运行时,“某物”正在工作,但当使用数据流作为运行程序时,它没有按预期工作。这似乎导致OP思考“我们将使用本地运行程序运行数据流”。在我看来,这不是一个好主意。一种是使用localrunner进行开发和单元测试。本地跑步者不提供任何形式的水平缩放。。。它实际上只在一台机器上运行 当在分

Google cloud dataflow 如何在apache beam和dataflow中设置logback MDC?

我们正在使用ApacheBeam,希望设置logback MDC。logback MDC是一个很好的资源,当您收到一个请求并存储一个用户ID(在我们的例子中,它是custId、fileId、requestId)时,那么每当开发人员登录时,它就会神奇地将该信息标记到开发人员日志中。开发人员不再忘记在每次添加日志语句时添加它 我从一个端到端的集成类型测试开始,该测试将apache beam direct runner嵌入到我们的微服务中进行测试(在生产中,微服务调用数据流)。目前,我看到在调用exp

Google cloud dataflow Dataflow wordcount.py示例“;不支持按文件名导入";

使用Ubuntu 14.04, 数据流Python SDK 按照[release]中的说明,在尝试wordcount示例时加载所有内容后,我尝试得到错误“不支持按文件名导入” 我怀疑问题出在wordcount.py示例的第23行 import google.cloud.dataflow as df 这个问题有解决办法吗 我已经尝试了发布在的解决方案,但这并没有解决问题。由于在第一条import语句中失败,因此需要检查的是是否安装了Python数据流包。第四种方法是运行“pip冻结”。以下是在虚

Google cloud dataflow 如何解决java.lang.NoSuchMethodError:com.google.api.services.dataflow.model.Environment.setSdkPipelineOptions与google云数据流?

我复制了这个例子 我还从pom.xml复制了所有依赖项。当我运行它时 mvn compile exec:java -Dexec.mainClass=com.example.MyExample 它可以编译,但我得到java.lang.NoSuchMethodError:com.google.api.services.dataflow.model.Environment.setSdkPipelineOptions,堆栈跟踪指向p.run()行。除了依赖项之外,还需要添加google Cloud d

Google cloud dataflow “数据流数据存储”;“争论太多”;异常处理

我需要在数据存储中插入大约十亿个不同类型的实体。我正在使用数据流来完成这项工作。这些行表示其他实体的复杂索引,因此使用这些实体作为祖先。当我遇到5个“这些数据存储实体上的争用太多”错误,管道崩溃时,我通过管道获得了大约10% 我应该采用什么策略来管理它,这样我就可以在不崩溃的情况下插入数据,而这需要重新启动管道 一种想法是让数据存储将有问题的实体放在队列的后面,而不是崩溃。数据存储不支持单个实体或实体组的高qps写入。您必须围绕它设计应用程序。请参阅和周围的文档 几个关键点: 避免使用高qps

Google cloud dataflow 每个窗口是否可以通过读取gcs存储桶来更新Dataflow sideInput?

我目前正在创建一个PCollectionView,从gcs存储桶中读取过滤信息,并将其作为侧面输入传递到管道的不同阶段,以过滤输出。如果gcs存储桶中的文件发生更改,我希望当前运行的管道使用此新过滤器信息。如果我的筛选器发生更改,是否有方法在每个新的数据窗口上更新此PCollectionView?我原以为我可以在一个开始的包里做这件事,但我不知道如何或是否可能。如果可能的话,你能举个例子吗 PCollectionView 标记地图视图= 管道.apply(TextIO.Read.named(“T

Google cloud dataflow 数据流作业的排出未结束

我用这个命令执行流作业的“drain” gcloud alpha数据流作业--项目=xxxxxx排放 但三天后它不会结束! 这是此流作业的日志 21:14:36.000 http: TLS handshake error from 172.17.0.2:40277: EOF 21:14:36.000 http: TLS handshake error from 172.17.0.2:36255: EOF 21:14:36.000 Kubelet is healthy?: true 21:14:4

Google cloud dataflow apachebeam管道(Dataflow)-解释无界数据的执行时间

在梁管道执行的数据流监控界面中,每个转换框中都指定了一个持续时间(请参阅) 对于有界数据,我知道这是完成转换所需的估计时间。然而,对于无界数据,如我的流式处理,我如何解释这个数字 我的一些转换的持续时间明显高于其他转换,这意味着转换需要更多的时间。但是,关于这种不均匀分布如何影响我的执行,特别是如果我有一个窗口函数,还有什么其他的含义呢 此外,这是否与自动缩放有关?例如,如果执行时间超过某些阈值,是否会有更多员工被提速?或者自动缩放是否取决于输入端的数据量?在批处理和流式处理中,这是衡量这些步骤

Google cloud dataflow 如何通过gcloudapi排出数据流作业?

是否可以通过gcloud api排出正在运行的数据流作业?具体来说,是否有一个python实现,或者请求结构的文档,以便可以实现它?如果能够在不使用cli的情况下自动排空和重新配置数据流作业,那就太好了。多亏了cli,我才能够排空正在运行的数据流 更新后请求的正文: { “requestedState”:“作业状态” } 嗨,蒂埃里·法尔沃。我想在Dataflow中运行批处理作业,以处理将来可能变得巨大的小数据集。在批处理模式下,数据流需要在运行作业之前启动VM,这对于小型数据集处理来说是非常不

Google cloud dataflow 访问文件名中的信息(元数据)&;在梁管道中键入

我的文件名包含我在管道中需要的信息,例如,我的数据点的标识符是文件名的一部分,而不是数据中的字段。e、 g每台风力涡轮机生成一个文件turbine-loc-001-007.csv。e、 g和我需要管道中的loc数据。Java(sdk 2.9.0): Beams TextIO阅读器不允许访问文件名本身,对于这些用例,我们需要使用FileIO来匹配文件,并获得对文件名中存储的信息的访问。与TextIO不同,用户需要在FileIO读取的下游处理文件的读取。FileIO读取的结果是PCollection

Google cloud dataflow 是否可以在ApacheBeam中对两个PCollection执行zip操作?

我有一个PCollection[str],我想生成随机对 来自Apache Spark,我的策略是: 复制原始PCollection 随机洗牌 使用原始PCollection将其压缩 但是,我似乎找不到压缩2个PCollection的方法…如何将转换应用于将键附加到元素的两个PCollection,并通过转换运行这两个PCollection 请注意,Beam不能保证PCollection中元素的顺序,因此输出元素可能在任何步骤后都会重新排序,但这似乎适合您的用例,因为您只需要一些随机顺序。对两个

Google cloud dataflow 工作池缩小时出错:";无法在不丢失活动无序排列数据的情况下缩小尺寸”;

更新至最新SDK版本0.3.150326,由于此错误,我们的作业失败: (d0f58ccaf368cf1f):工作流失败。原因:(539037ea87656484): 如果不丢失活动无序排列数据,则无法缩小大小。旧尺寸=10, 新尺寸=8 工作编号:2015-04-02_21_26_53-11930390736602232537 无法复制,但我想我应该问一下这是否是一个已知的问题 综上所述,自动缩放目前似乎只是“实验性的”,但我可以想象这是云数据流的核心功能,因此应该得到充分支持 1087 [m

Google cloud dataflow 创建多个带标记的输出时Google数据流出堆

我有许多未分区的大型BigQuery表和文件,我希望以各种方式进行分区。所以我决定尝试编写一个数据流作业来实现这一点。我认为这工作很简单。我试着用泛型来编写代码,这样我就可以轻松地将它应用到TextIO和BigQueryIO源代码中。它在小表上运行良好,但在大表上运行时,我不断得到java.lang.OutOfMemoryError:java堆空间 在我的主类中,我要么读取带有目标键的文件(由另一个DF作业生成),要么对BigQuery表运行查询,以获取要切分的键列表。我的主要课程是这样的: P

Google cloud dataflow 数据流作业失败,返回“0”;“无法培养足够的工人”;

我的数据流作业失败,出现以下消息,我应该如何调试 工作流失败。原因:(65a939e801f185b6):无法提供足够的 工人:最少1人,实际0人 当服务无法从中分配虚拟机以执行作业时,将输出此消息。请在中检查您的配额。当无法从中分配虚拟机以执行作业时,服务将输出此消息。请在中查看您的配额。我也有同样的问题。然而,切换区域为我解决了这个问题。我相信,当没有可用资源时,它有时会发出相同的错误消息 我也有同样的问题。然而,切换区域为我解决了这个问题。我相信,当没有可用资源时,它有时会发出相同的错误消

Google cloud dataflow 如何定期触发窗口?

我希望每M分钟处理最后N小时的数据。数据通过pubsub传输时会有各种延迟,但我不想因为时间偏差或数据延迟而延迟窗口触发。也就是说,我只想处理触发时可用的数据,并希望基于墙时间触发(但每个窗口应包括基于数据事件时间的[Triggered time-M hours,Triggered time]之间的数据) 我尝试使用滑动窗口,但触发是基于事件时间的,并且似乎由于数据延迟而延迟 最好的方法是什么 谢谢,您应该在FixedWindows中使用触发器。这将与您描述的完全一样

Google cloud dataflow 将消息发布到数据流中的Pubsub主题

在数据流中将消息发布到发布/订阅主题的推荐方式是什么。 我曾经使用过客户端API,但不认为这是在数据流中处理此问题的最佳方法 PublishResponse-response=client.projects().topics() .publish(fullTopicName、publishRequest) .execute(); 从数据流作业向PubSubTopic发布消息的最佳方法是使用PubsubIO类。例如: Pipeline p=Pipeline.create(); //做一些变换 p、

Google cloud dataflow 升级到Beam 2.4.0后DataFlow Runner失败

我有一个用于测试的简单数据流作业,它在apache beam 2.1.0中成功运行,代码如下所示: publicstaticvoidmain(字符串[]args)引发异常{ DataflowPipelineOptions dataflowOptions=PipelineOptions工厂.as(DataflowPipelineOptions.class); dataflowOptions.setProject(“我的项目ID”); dataflowOptions.setStagingLocati

Google cloud dataflow 从Google云存储读取到本地计算机的数据流

我尝试了一个数据流作业,从谷歌云存储中读取数据并写入本地机器。我使用了DirectPipelineRunner。作业已成功完成。但是我看不到本地机器上写的文件。我是否应该指定任何ip/主机名以及与输出位置参数对应的本地位置?如何在本地计算机中指定位置 命令如下: gcloud dataflow jobs run sampleJobname1 --gcs-location gs://bucket/templatename1 --parameters inputFilePattern=gs://sa

Google cloud dataflow 隐藏数据流选项

我使用带有模板的Google cloud dataflow runner,并通过管道选项(例如用户名和密码)将敏感信息传递给它。任何从dataflow控制台打开我的dataflow作业的人都可以在侧面板中清楚地看到我通过选项发送的敏感信息。有没有办法隐藏这些选项以防出现?多谢各位 一种想法可能是更改管道,使其不从参数中获取敏感值,而是使用备用值。例如,不提供用户名和密码作为管道选项,而是从GCS文件中读取这些选项,该文件包含用户名和密码,并且该文件的权限不允许从其他人处读取。您是否能够为正在使用

Google cloud dataflow com.google.cloud.span.span异常:超过了截止日期

我已经被这个问题困扰了一天多,我想这里可能有人知道答案。基本上,作为一个简单的测试,我试图从一个表中读取数据,并将结果简单地输出到一个日志文件中。该表相当大(约1.67亿行)。我一直得到以下错误 java.lang.RuntimeException:org.apache.beam.sdk.util.UserCodeException:com.google.cloud.Spaner.SpanerException:DEADLINE_Exception:io.grpc.Status RuntimeE

Google cloud dataflow “什么是”呢;SQL启动程序中出现错误";是指在使用数据流SQL UI时?

我尝试使用DataflowSQLUI创建数据流作业。我遵守了规则,工作正常进行。我将数据源更改为BigQuery表。我的计划是: 从BigQuery表进行查询 将结果保存回BigQuery表 创建数据流作业时,收到错误消息: 这个错误意味着什么?谢谢你的帮助 数据流SQL还不支持日期类型(SQL验证程序中有一个bug没有捕获到该类型)。选择日期类型的字段将导致出现NPE错误消息,修复方法是只选择日期类型的字段。问题:如果需要查询一个表并将结果保存到另一个表,为什么不使用BigQuery引擎而不

Google cloud dataflow 在谷歌云中,如何查看与gPubSub主题/订阅匹配的所有数据流源/汇?

我们有大约100个Google Cloud PubSub主题/订阅、数据流和BigQuery/BigTable表 我可以列出子主题: gcloud测试版发布子主题列表 我可以使用xargs,并为每个主题列出订阅内容: gcloud beta pubsub topics列表订阅$topic\u id 我可以列出所有BigQuery表: bq ls[项目id:[数据集id] 和所有BigTable表格: cbt-项目$project-实例$instance ls 我可以列出所有正在运

Google cloud dataflow 谷歌和谷歌的区别是什么;s数据流和谷歌';什么是dataproc?

DataFlow本身有ETL、计算和流式处理为什么我们需要使用google的Dataproc 是一个完全管理和自我优化的云服务,允许您使用编程模型编写批处理和流式数据处理管道。它与许多开源和谷歌云数据源和汇集成 是一种完全托管的云服务,用于以简单、经济高效的方式运行ApacheHadoop和ApacheSpark集群。如果您有使用Spark、Hive或Pig的现有数据处理管道,这是移动管道的一种快速简便的方法。您可以随时安装自定义软件包、启动/停止和扩展这些集群。最重要的是,Google Dat

Google cloud dataflow 我可以修改apache beam转换中的元素吗?

包含以下规则: 3.2.2。不变性 PCollection是不可变的。一旦创建,就不能添加、删除或删除 更改单个元素。光束变换可能会处理每种情况 元素,并生成新的管道数据(作为新的 PCollection),但它不使用或修改原始输入 收藏 这是否意味着我不能、不能或不应该修改自定义转换中的单个元素? 具体地说,我正在使用python SDK,并考虑一个转换的情况,该转换将dict{key:“data”}作为输入,进行一些处理并添加更多字段{other_key:“some more data”}。

Google cloud dataflow 如何尽快限制Apache Beam中的PCollection?

我正在谷歌云数据流上使用ApacheBeam2.28.0(带有Scio SDK)。我有一个大的输入PCollection(有界),我想将其限制/采样到固定数量的元素,但我想尽快开始下游处理 当前,当我的输入PCollection包含例如2000万个元素时,我希望使用 input.apply(Sample.any(1000000)) 它会一直等到所有的2000万个元素都被读取,这需要很长时间 如何有效地将元素数量限制为固定大小,并在达到限制后立即开始下游处理,放弃其余的输入处理?好的,因此我的初

  1    2   3   4   5   6  ... 下一页 最后一页 共 11 页