将大型dask数据帧写入单个S3 CSV文件

我正在使用一个dask.distributed集群,我想将一个大数据帧保存到S3的单个CSV文件中,如果可能的话,保持分区的顺序(默认情况下,将数据帧写入多个文件,每个分区一个)。此外,此文件存储操作还应作为延迟/延迟任务执行(应在集群工作程序上执行) 我能想到的第一个解决方案是将分区存储在临时S3位置,然后在延迟函数中使用S3 multipart upload将这些文件合并/上传到一起。临时存储是这里的一个缺点 可以使用dataframe.to_delayed()完成吗?上载需要由单个延迟函数

Dask 如何使xarray.interp()并行工作?

我在一个大型3D数据阵列(气象数据:纬度、经度、时间)上使用xarray.interp,根据离散映射函数f将值(风速)映射到新值。 插值方法似乎只使用一个核进行计算,这使得该过程效率低下。我不知道如何使xarray使用多个内核来完成此任务 我确实通过htop和dask仪表板为xarray.interp监控了计算。 htop仅显示要使用的一个核心,仪表板不显示任何工人的任何活动。我能观察到的唯一dask活动是从磁盘加载netcdf数据文件。如果使用.load()预加载数据,则此dask活动将消失

DASK-MemoryError:无法分配形状和数据类型为int64的数组

正在获取错误 MemoryError:无法分配形状为(15145488917,)且数据类型为int64的数组 …尝试以下合并时 master = dd.merge(df1, df2) 以前的代码是 df1.count().compute() …具有输出 den 312019 num 312019 dtype: int64 den 970531 num 970531 dtype: int64 ……还有 df2.count().compute() …具有输出 den

为什么在dask中计算索引拼花地板文件上的形状如此缓慢?

我已经从位于同一文件夹中的多个拼花文件创建了一个拼花文件。每个文件对应一个分区 拼花地板文件是使用Python concurrent.futures在不同的进程中创建的。下面是我在一个进程中运行的代码示例: `df`是一个标准的熊猫数据帧,具有 22列不同类型,最多100e3行。 设置索引 df.set_indexcid,原地=真 写入单个文件 fastparquet.writefpath,df,compression='snappy,file\u scheme='simple df最多包含10

Dask 达斯克:请';pip安装zict&x27;在Centos 7上使用Python 2.7

在Centos 7上安装了带有“pip安装dask[complete]分布式--升级”Python 2.7的dask。 运行: 获取: Process-1:回溯(最近一次调用最后一次):文件 “/usr/lib64/python2.7/multiprocessing/process.py”,第258行,在 _bootstrap self.run()文件“/usr/lib64/python2.7/multiprocessing/process.py”,运行中的第114行 self.\u targe

Dask-从SQL加载数据帧而不指定索引列

我正在尝试从SQL连接加载Dask数据帧。根据,必须传入一个索引列。如果可能没有好的列作为索引,我该怎么办 这是合适的替代品吗 # Break SQL Query into chunks chunks = [] num_chunks = math.ceil(num_records / chunk_size) # Run query for each chunk on Dask workers for i in range(num_chunks): query = 'SELECT * F

在Dask中传递期货作为参数

将未来传递给Dask延迟功能以使未来保持稳定的最佳方式是什么?换句话说,我们如何确保函数将获得实际的未来,而不是它所代表的结果?通常语义是dask.delayed函数得到具体的结果,而不是dask-y函数。今天,如果没有一些技巧,这是不容易支持的 尽管如此,我还是推荐以下技巧: 把你的未来放在一个 谢谢这似乎是合理的。还发现dask.core.literal有效,使用它是否合理?人力资源管理,是的,这似乎是一个更好的主意。我建议提高PR,将其推送到顶级API中,并ping jcrist,看看他的

Dask read_json元数据不匹配

我正在尝试将json文件加载到dask df中 files = glob.glob('**/*.json', recursive=True) df = dd.read_json(files, lines = False) 数据中缺少一些值,并且一些文件有额外的列。 有没有办法指定一个列列表,以便所有可能的列都存在于连接的dask df中? 此外,它不能处理缺少的值吗?我在尝试计算df时遇到以下错误: ValueError: Metadata mismatch found in `from_de

Dask Xarray Distributed无法序列化

我需要通过线性插值对一些卫星图像进行采样,这些图像组织在一个数据阵列中。 在本地运行代码之前,我没有问题,但是如果我尝试在分布式系统上复制插值,我会返回以下错误: `Could not serialize object of type tuple` 要复制这个问题,需要在分布式或本地环境之间切换。 这里是代码的分布式版本 n_time = 365 px = 2000 lat = np.linspace(19., 4., px) lon = np.linspace(34., 53., px)

使用Dask逐帧读取视频文件?

我想知道Dask是否是处理视频文件的好工具。在本地,我使用OpenCV一次读取一帧并处理它。我认为,如果视频的每一帧都由单独的Dask任务处理,那将是理想的。另外,对于多个时间同步的视频文件,我想知道如何从每个摄像头一次读取一帧,并在一个任务中处理两个摄像头的帧 我不认为Dask支持生成器函数,但如果它真的支持,我认为这将是理想的。生成器任务将在任务图中一次生成一帧视频 下面是一个与我正在做的类似的示例脚本。它从多个视频文件中的每个文件中取出一帧,裁剪并调整每个文件的大小到标准大小,然后将这些文

将延迟对象中的dask序列添加到dask数据帧

我想使用dask数据帧中的列创建一个dask系列,并将其添加到dask数据帧中。创建新系列的函数可以是任意复杂的,并且可能使用dask.dataframe没有的函数。我的想法是为该函数使用dask.delayed装饰器,并从单个delayed对象创建一个dask系列。然后,我遇到了一个问题:必须将这个新系列的分区/分区与原始dask数据帧对齐,以便将其作为列附加。我可以假设由该函数创建的序列始终具有与原始dask数据帧相同的长度和索引。所以我尝试了一些“肮脏”的伎俩来实现它。见下文: impor

如何停止Dask中正在运行的任务?

当使用Dask的分布式调度程序时,我有一个在远程工作者上运行的任务,我想停止该任务 我该怎么阻止它?我知道cancel方法,但是如果任务已经开始执行,那么它似乎不起作用。如果它还没有运行 如果任务尚未开始运行,则可以通过取消关联的未来任务来取消该任务 future = client.submit(func, *args) # start task future.cancel() # cancel task 如果您使用的是dask集合,则可以使用cli

Dask 关于群集调度器和单机分布式调度器的混淆

在下面的代码中,为什么dd.read_csv在集群上运行? client.read_csv应在群集上运行 import dask.dataframe as dd from dask.distributed import Client client=Client('10.31.32.34:8786') dd.read_csv('file.csv',blocksize=10e7) dd.compute() 在这种情况下,一旦我创建了一个客户端对象,所有api调用都将在集群上运行吗?commnad

使用变量设置Dask工作进程

当工作人员加载并将其放入全局变量(如calib_data)时,我希望分配一个更大的对象(或从磁盘加载)。这对dask工作者有用吗?在这种情况下,客户端方法似乎可以做您想要做的事情。您仍然需要放置变量,因为python中没有真正的全局作用域。例如,某个地方可以是导入模块的任何属性,任何工作人员都可以访问该属性。您也可以将其添加为worker实例本身的属性,但我看不出有明显的理由要这样做 一种可行的方法是,劫持一个随机挑选的内置模块;但我并不特别推荐(见下文) 但是,在继续之前,您是否已经考虑过延迟

Dask在群集之间切换或更改群集上下文

我是达斯克的新手,所以如果这个问题对你来说很愚蠢,请原谅我。在Dask中,我使用的是一个数据量约为50GB的Dask数据帧。这些数据是字符串数据,我需要在将其交给机器学习算法(线程快速)之前对其进行预处理(进程快速)。现在的问题是,当我针对进程设计集群时,数据帧操作速度很快,但对于线程来说速度很慢(但是机器学习的线程速度很快)。因此,我正在寻找一个解决方案,在这个解决方案中,我们可以从进程切换到线程环境 目前,我正在使用process cluster保存预处理数据,然后关闭它,并使用线程化环境启

KilledWorker异常在Dask中意味着什么?

在Dask.distributed scheduler中使用Dask时,我的任务返回时出现KilledWorker异常。这些错误是什么意思?当Dask计划程序不再信任您的任务时,会生成此错误,因为当工作人员意外死亡时,此错误经常出现。它的设计目的是保护集群不受导致工作人员死亡的任务的影响,例如由于SEGFULTS或内存错误 每当某个工作进程意外死亡时,计划程序会记录该工作进程死亡时正在运行的任务。它会在其他员工身上重试这些任务,但也会将其标记为可疑。如果同一任务在多个工作线程死亡时出现,那么调度

dask并行化可以封装在一个类中吗?

是否可以将dask并行化封装在一个类中?在它的最终形式中,我的类在调用run之前将进行大量初始化-我将我的问题精简为框架问题。请注意,该代码适用于LocalCluster,并且类外部的分布式Calc也适用于同一HPC集群。下面是简要的代码以及相应的错误消息: import numpy as np from dask_jobqueue import PBSCluster from dask.distributed import Client from dask.distributed import

解读daskui

我在看Dask用户界面,试图找出每个字段的含义。但是,我无法理解下图中显示的写入字节和读取字节。此外,在某些情况下,写入字节数大于读取字节数。我找不到与此相关的任何文档。写字节和读字节到底意味着什么 我正在使用Joblib dask后端对MNIST数据运行一个简单的逻辑回归任务。 这些字段是关于进出该工作进程的网络流量的。这些是常用的计算术语,指通过网络接口写入的字节的测量,以及从网络接口读取的字节的测量 这是一种带宽利用率测量 一些工具(如Windows的资源监视器)将其称为“已发送”和“已接

dask_lightgbm使用的完整训练集?

我正在阅读估计器的实现(特别是dask_lightgb.core.py中的\u train\u part函数),我没有看到如何使用整个训练集来适应最终的估计器 \u train\u part函数接受布尔参数return\u model,在train函数的实现中(使用客户端.submit对每个工作人员调用\u train\u part),只有当工作人员是“主工作人员”时,return\u model才为真(其本身似乎是随机选择的Dask工作者)。从逻辑上讲,每个工作人员都会被分派整个模型训练集中的

如何在任务完成时获得结果,而不是在Dask中完成所有任务后获得结果?

我有一个dask数据帧,想计算一些独立的任务。有些任务比其他任务快,但我会在完成更长的任务后得到每个任务的结果 我创建了一个本地客户端,并使用Client.compute发送任务。然后我使用future.result来获得每个任务的结果 我使用线程同时请求结果,并测量每个结果的计算时间,如下所示: def get_结果未来,i: t0=时间 打印计算结果,i result=future.result printresult{}采用了{}.formati,time.time-t0 客户端=客户端

使用文件结构将多个模块推送到Dask集群

这几乎是这个问题的延伸 我有一个类似的挑战,除了我试图运行的代码分布在多个模块中,其中一些模块来自一个子目录。为了使我的代码合理有序,我有一个主要的“代码运行器”,它从“功能”文件夹加载不同的模块,以便执行一些数据处理步骤 我知道我可以将文件加载到Dask集群,但我如何才能做到这一点,以保持目录结构,并且我的导入仍然可以像“从要素导入要素1作为f1”(例如) 或者,如果我没有正确地考虑这一点,是否有更好的方法使用Dask实现这一点?人们通常使用Docker、NFS、conda pack或其他一些

来自库的Dask循环开销

当调用另一个库到dask时,如scikit图像对比度拉伸,我意识到dask正在为每个块创建一个结果,存储在内存中或分别溢出到磁盘。然后它尝试合并所有结果。如果您在集群或单台计算机上,并且阵列的数据集很小,那么一切都可以得到很好的控制,这很好。当您使用比RAM或磁盘大得多的数据集时,问题就开始出现了。有没有办法缓解这一问题,或者使用zarr文件格式保存数据,以便在进行更新时更新值?也许这太离奇了。除了购买更多ram之外,其他任何想法都会有所帮助 编辑 我在看关于dask的文档,关于dask块大小

在NERSC集群上将数据帧从jupyter加载到dask mpi

我正在尝试使用NERSC上的dask mpi客户端将一些相对较大(约15GB)的VTK文件加载到dask数据帧中。然而,我很难有效地实现这一点,因为大部分工作是以并行方式在NERSC节点上进行的 这里的基本结构遵循Rollin Thomas关于在NERSC()上使用dask的笔记本。因此,我: 在NERSC上推出Jupyter笔记本电脑 访问交互队列上的节点,定义调度程序文件 启动访问同一计划程序文件的客户端。此时,我可以启动客户端仪表板并观察流程 读取Jupyter节点上的VTK文件,并将其放

Dask 纯度遗传

函数纯度的继承是如何工作的?例如,我希望此代码: In [150]: myObj = delayed(dict,{}, pure=True) In [151]: myObj Out[151]: Delayed('dict-343cc4b6676839eb7fd74272cd0a1ed1') In [152]: myObj = delayed(dict,{}, pure=True) In [153]: myObj Out[153]: Delayed('dict-343cc4b6676839e

元数据帧和单个数据帧之间列顺序不同时的Dask不一致行为

我正在从delayed对象构建一个数据帧,该对象返回单个(pandas)数据帧,我向from\u delayed调用提供meta 碰巧延迟对象返回的列顺序与meta中提供的列顺序不匹配 在这种情况下,dask会变得混乱,并根据所要求的计算将两个排序之间的数据混淆 e、 g: 按照@mdurant的建议,可以强制订购: ddf = ddf.map_partitions(lambda x: x[['date', 'ent', 'val', '(1)', '(2)']]) 来自_delayed的结果

Dask-删除重复索引内存错误

当我试图用下面的代码在一个大数据帧上删除重复的时间戳时,我得到了一个内存错误 import dask.dataframe as dd path = f's3://{container_name}/*' ddf = dd.read_parquet(path, storage_options=opts, engine='fastparquet') ddf = ddf.reset_index().drop_duplicates(subset='timestamp_utc').set_index('t

Dask 在map_块中使用numba函数

我已经在dask阵列上成功地使用了几次map_块。我现在尝试部署一个numba函数来处理每个块,并处理和更改其中一个输入 numba函数接收2个numpy数组,并更新第二个数组。然后在return语句中返回,以使其可用于映射_块 该函数在numpy数组上运行良好,但python在从map_块调用它时崩溃。不作用于输入数组的numba函数行为正常(尽管在这种情况下很难让它们做任何有用的事情)。 这是已知的限制吗?虫子?我用错了吗?! 更新 我终于把它归结为一个可复制的例子,使用一个简单的numba

使用现有dataproc群集运行dask

我有一个运行在谷歌云平台上的dataproc集群。我打算在dask客户机中传递这个集群,而不是初始化一个新的dask集群 但是,我不能直接使用我的dataproc集群 #Instead of : cluster = YarnCluster(environment='environment.tar.gz',worker_vcores=2, worker_memory="8GiB") cluster.scale(10) client = Client(cluster) #Directly usin

如果某些块仅包含NaN值,则Dask distributed不运行SVD

首先感谢您为dask提供其所有功能,非常感谢 但是,使用dask.distributed在光栅化数据集上运行SVD时,如果只有单个数据块只包含NaN值,那么它似乎会失败,尽管数据集的大多数确实包含正确的值 我使用xarray.open_mfdataset(chunks={…})读取数据集,并尝试设置chunksize,以便包中使用的SVD计算(dask.array.linalg)通过使用dask.distributed客户端,利用集群提供的核心 尺寸:(时间:8760,x:1000,y:840

GPU Dask Cuda群集:client.submit

我非常熟悉为CPU分发的Dask。我想探索在GPU内核上运行代码的过渡。当我向LocalCudCluster提交任务时,会出现以下错误: ValueError: tuple is not allowed for map key 这是我的测试用例: import cupy as cp import numpy as np from dask_cuda import LocalCUDACluster from dask.distributed import Client cluster = Lo

每个工人有多少dask工作

如果我启动一个包含N个工作进程的dask集群,然后使用cluster.compute提交N个以上的作业,dask是尝试同时运行所有作业(通过在每个工作进程上调度多个作业),还是作业排队并按顺序运行 我最近这样做的经验似乎暗示了后者。每个作业都非常占用内存,提交的作业比工作人员多,这会导致所有作业由于内存问题而崩溃 有没有办法强制dask一次只能在一个工作线程上运行一个作业,并对其他作业排队?默认行为由集群大小设置。如果辅助线程数大于4,dask将尝试猜测每个辅助线程中要使用的线程数。如果要更改此

Dask 并行地从RESTAPI加载数据帧

我希望通过HTTP POST向REST API发出并行请求,该POST返回CSV(可选JSON)并将结果加载到数据帧中。有这种情况的例子吗?我找过了,运气不好。下面是我可以用来搜索的cURL命令示例: https://someserver/search \ -d search="${1}" \ -d count=100 -d output=csv 对于每个被某个搜索谓词(比如state=“MD”)划分的请求,搜索字符串都需要不同。如果您阅读dask的

Dask 如何解决;并非所有部门都是已知的”;错误?

我正在尝试使用groupby过滤Dask数据帧 df = df.set_index('ngram'); sizes = df.groupby('ngram').size(); df = df[sizes > 15]; 但是,df.head(15)抛出错误ValueError:并非所有分区都已知,无法对齐分区。请使用“设置索引”来设置索引。。尺寸上的分区不清楚: >>> df.known_divisions True >>> sizes.known_di

Dask数据帧获取第二高的值和列名

此代码为我提供了最高的值和列名 import numpy as np import pandas as pd import dask.dataframe as dd cols=[0,1,2,3,4] df = pd.DataFrame(np.random.randn(1000, len(cols)), columns=cols) ddf = dd.from_pandas(df, npartitions=4) ddf['max_col'] = ddf[cols].idxmax(axis=1) d

Dask一个热编码器句柄“未知=”;忽略“;,工作?

我理解这一点,它现在还没有处理,但它阻止了我对经过培训的OneHotEncoder/管道进行实时编码(比如在实时API服务中) 如果碰巧需要对以前没有见过的东西进行编码,人们如何解决需要对数据进行实时编码的问题 谢谢

Dask数据帧上的值错误

我正在使用dask读取csv文件。但是,由于以下错误,我无法对其应用或计算任何操作: 您知道这个错误是怎么回事吗?如何修复它?在dask中读取csv文件时,如果无法识别正确的列数据类型,就会出现错误 例如,我们使用dask读取csv文件,如下所示: import dask.dataframe as dd df = dd.read_csv('\data\file.txt', sep='\t', header='infer') 这将提示上述错误 为了解决这个问题,@mrocklin在这个评论中建

使用dask就地计算 短版

我有一个dask数组,它的图形最终基于底部的一组numpy数组,并对它们应用元素操作。使用da.store计算数组并将结果存储回原始备份numpy数组是否安全,从而使整个过程成为就地操作 如果您认为“您使用的dask是错误的”,那么请参阅下面的详细版本,了解我为什么觉得有必要这样做 长版本 我将dask用于一个应用程序,其中原始数据来自内存中的numpy数组,该数组包含从科学仪器收集的数据。目标是用原始数据填充大部分RAM(比如75%+),这意味着没有足够的内存来创建内存中的副本。这使得它在语义

有没有办法知道dask工作程序是在CPU设备上运行还是在GPU设备上运行。?

假设一个dask集群有一些CPU设备和一些GPU设备。每个设备运行一个dask工作进程。现在,问题是我如何发现dask工作程序的底层设备是CPU或GPU 例如:-如果dask工作程序在CPU设备上运行,我应该知道它在CPU上运行,或者如果dask工作程序在GPU设备上运行,我应该以编程方式知道设备类型。有什么方法可以通过编程来了解这一点吗?上面评论中的链接答案是关于提前按资源标记不同的工人,然后根据他们可能需要的资源分配任务 相反,也许您希望以异构的方式运行计算,也就是说,您不介意哪个任务可以在

异步实时并行分布式Dask

我正在阅读关于dask.distributed的文档,看起来我可以通过client.submit()向分布式集群提交函数 我有一个现有函数some_func,它异步抓取单个文档(比如,文本文件),我想获取原始文档,抓取所有不包含元音的单词,并将其推回到另一个数据库中。此数据处理步骤是阻塞的 假设有数百万个文档,分布式集群只有10个节点和1个可用进程(即一次只能处理10个文档),dask.distributed将如何处理需要处理的文档流 下面是一些示例代码: client = dask.distr

使用Dask_xgboost-train方法时,Dask挂起

我试图从位于的dask ml文档中复制dask xgboost示例。不幸的是,Dask似乎没有完成培训,我很难找到错误和警告的含义。这是我的密码: def main(): cluster = LocalCluster() dask_client = Client(cluster) x, y = make_classification(n_samples=1000, n_features=10, chunks=10, n_informative=4, random_sta

Dask client.upload_file()用于嵌套模块

我有一个项目结构如下 - topmodule/ - childmodule1/ - my_func1.py - childmodule2/ - my_func2.py - common.py - __init__.py client = Client(YarnCluster()) client.submit(MyFuncClass1.execute) 在Dask集群边缘节点上的Jupyter笔记本中,我正在执行以下操作 from topm

Dask客户端检测本地默认群集已在运行

这有可能吗?您不应该在同一个Python会话中创建多个客户端。也许有必要更深入地了解为什么您要多次给客户打电话 如果已经有一个Dask群集在默认地址上运行,则可以设置Dask_调度程序_地址环境变量,该变量将指示客户端在那里查看,而不是创建本地群集 from dask.distributed import Client Client() Client(do_not_spawn_new_if_default_address_in_use=True) # should not spawn a n

AttributeError:模块“dask”没有属性“delayed”

使用Pycharm社区2018.1.4 Python 3.6 Dask 2.8.1 尝试在我的一些方法上实现dask延迟,但出现错误 AttributeError: module 'dask' has no attribute 'delayed'. 这显然不是真的,所以我想知道我做错了什么。我的实施结构如下: 进口达斯克 def总管 对于enumaratefilenames中的i,fn: 数据={} 对于0,2范围内的x: data.updatedask.delayedload_datafn,

使用多列作为索引旋转dask数据帧

我有以下格式的Dask数据帧: date hour device param value 20190701 21 dev_01 att_1 0.000000 20190718 22 dev_01 att_2 20.000000 20190718 22 dev_01 att_3 18.611111 20190701 21 dev_01 att_4 18.706083 20190718 22 dev_01 att

Dask pyarrow-识别写入拼花地板数据集时使用的片段或过滤器?

我的用例是,我希望将文件路径或过滤器作为xcom传递给Airflow中的任务,以便我的下一个任务可以读取刚刚处理的数据 任务A将表写入分区数据集,并生成大量拼花文件片段-->任务B稍后将这些片段作为数据集读取。不过,我只需要读取相关数据,而不需要读取可能有数百万行的整个数据集 我测试了两种方法: 在我完成写入数据集后立即列出修改过的文件。这将为我提供一个路径列表,我可以在下一个任务中调用ds.dataset(路径)。我可以在这些路径上使用partitioning.parse(),或者检查片段以获

dask工人给redis写信

假设我有一个函数,它执行一些处理并将结果存储到redis服务器 r = redis.StrictRedis() def process(data): (...do some work...) r.put(...) 现在我有了大量的数据,我想使用dask来并行化这个过程。类似于 from dask.distributed imoprt Client client = Client() for x in data: client.submit(process,x) 但是

从并行txt文件读取dask数据帧

我有两个(或更多)并行文本文件存储在S3中-即第一个文件中的第1行对应于第二个文件中的第1行等。我想将这些文件作为列读取到单个dask数据帧中。做这件事最好/最简单/最快的方法是什么 另外,我可以将它们读入一个单独的数据帧,但是我不能在索引上连接它们,因为数据帧索引值似乎既不唯一也不单调。同时,行的对应关系由它们在每个文件中的位置来定义。不幸的是,dask.dataframe按字节而不是按行分解大型文件。如果不先通读大文件中的某一行,则很难找到该行。对,是否可以生成全局单调(或至少唯一)索引?我

dask.bag.from_序列是否保持顺序?

我试图确定当使用from_sequence函数实例化时,dask.bag是否保留顺序 我从其他问题中了解到,订购包(例如)不保证,但我不清楚原因。 这是否意味着“现在它通常是为列表保留的,但是我们可能不得不更改它,因为稍后内部会有一些内容” 除了对复杂的数据结构进行测试外,我还运行了下面的简单测试。在这两种情况下,排序似乎都得到了保留。只是不确定我是否需要做更多的工作,例如,使用zip获取表单的元组列表(索引、对象),并将我的bag从中取出,这样我就可以在应用compute后按索引排序 来自da

Dask 凸矩阵优化

我目前正在尝试为凸矩阵优化实现Dask。目标是对内存不足的矩阵执行矩阵优化(因子分解)。以一个高矩阵作为输入,两个高矩阵作为输出,再加上一些参数(如收敛性等),我使用dask数组将原始矩阵和itermediate/output矩阵分块。最后,迭代是连续的,即前一次迭代的输出被用作新迭代的输入(参见下面的简单示例) 在每次迭代中,必须计算并检查两个标准的收敛性(最终if语句)。但是,如果我执行下面给出的代码,我看到的是Dask计算标准(由if语句强制执行),但在每次迭代中重新计算其他矩阵:即迭代1

Dask 异步Xarray写入Zarr

全部。我正在使用一个Dask分布式集群在一个循环中写入Zarr+Dask支持的Xarray数据集,dataset.to_Zarr正在阻塞。当存在阻碍循环继续的散乱块时,这确实会减慢速度。有没有一种方法可以异步执行.to_zarr,这样循环就可以继续进行下一次数据集写入,而不会被几个散乱的数据块阻塞 使用分布式调度程序,无需任何特殊努力即可获得异步行为。例如,如果您正在执行arr.to_zarr,那么实际上您将等待完成。但是,您可以执行以下操作: client = Client(...) out

  1    2   3   4   5   6  ... 下一页 共 6 页