标签: Dask
dask-distributed
我正在使用一个dask.distributed集群,我想将一个大数据帧保存到S3的单个CSV文件中,如果可能的话,保持分区的顺序(默认情况下,将数据帧写入多个文件,每个分区一个)。此外,此文件存储操作还应作为延迟/延迟任务执行(应在集群工作程序上执行)
我能想到的第一个解决方案是将分区存储在临时S3位置,然后在延迟函数中使用S3 multipart upload将这些文件合并/上传到一起。临时存储是这里的一个缺点
可以使用dataframe.to_delayed()完成吗?上载需要由单个延迟函数
我在一个大型3D数据阵列(气象数据:纬度、经度、时间)上使用xarray.interp,根据离散映射函数f将值(风速)映射到新值。
插值方法似乎只使用一个核进行计算,这使得该过程效率低下。我不知道如何使xarray使用多个内核来完成此任务
我确实通过htop和dask仪表板为xarray.interp监控了计算。
htop仅显示要使用的一个核心,仪表板不显示任何工人的任何活动。我能观察到的唯一dask活动是从磁盘加载netcdf数据文件。如果使用.load()预加载数据,则此dask活动将消失
正在获取错误
MemoryError:无法分配形状为(15145488917,)且数据类型为int64的数组
…尝试以下合并时
master = dd.merge(df1, df2)
以前的代码是
df1.count().compute()
…具有输出
den 312019
num 312019
dtype: int64
den 970531
num 970531
dtype: int64
……还有
df2.count().compute()
…具有输出
den
标签: Dask
parquetfastparquet
我已经从位于同一文件夹中的多个拼花文件创建了一个拼花文件。每个文件对应一个分区
拼花地板文件是使用Python concurrent.futures在不同的进程中创建的。下面是我在一个进程中运行的代码示例:
`df`是一个标准的熊猫数据帧,具有
22列不同类型,最多100e3行。
设置索引
df.set_indexcid,原地=真
写入单个文件
fastparquet.writefpath,df,compression='snappy,file\u scheme='simple
df最多包含10
在Centos 7上安装了带有“pip安装dask[complete]分布式--升级”Python 2.7的dask。
运行:
获取:
Process-1:回溯(最近一次调用最后一次):文件
“/usr/lib64/python2.7/multiprocessing/process.py”,第258行,在
_bootstrap self.run()文件“/usr/lib64/python2.7/multiprocessing/process.py”,运行中的第114行
self.\u targe
标签: Dask
dask-distributeddask-delayeddask-dataframe
我正在尝试从SQL连接加载Dask数据帧。根据,必须传入一个索引列。如果可能没有好的列作为索引,我该怎么办
这是合适的替代品吗
# Break SQL Query into chunks
chunks = []
num_chunks = math.ceil(num_records / chunk_size)
# Run query for each chunk on Dask workers
for i in range(num_chunks):
query = 'SELECT * F
标签: Dask
dask-distributeddask-delayed
将未来传递给Dask延迟功能以使未来保持稳定的最佳方式是什么?换句话说,我们如何确保函数将获得实际的未来,而不是它所代表的结果?通常语义是dask.delayed函数得到具体的结果,而不是dask-y函数。今天,如果没有一些技巧,这是不容易支持的
尽管如此,我还是推荐以下技巧:
把你的未来放在一个
谢谢这似乎是合理的。还发现dask.core.literal有效,使用它是否合理?人力资源管理,是的,这似乎是一个更好的主意。我建议提高PR,将其推送到顶级API中,并ping jcrist,看看他的
我正在尝试将json文件加载到dask df中
files = glob.glob('**/*.json', recursive=True)
df = dd.read_json(files, lines = False)
数据中缺少一些值,并且一些文件有额外的列。
有没有办法指定一个列列表,以便所有可能的列都存在于连接的dask df中?
此外,它不能处理缺少的值吗?我在尝试计算df时遇到以下错误:
ValueError: Metadata mismatch found in `from_de
标签: Dask
python-xarraydask-distributed
我需要通过线性插值对一些卫星图像进行采样,这些图像组织在一个数据阵列中。
在本地运行代码之前,我没有问题,但是如果我尝试在分布式系统上复制插值,我会返回以下错误:
`Could not serialize object of type tuple`
要复制这个问题,需要在分布式或本地环境之间切换。
这里是代码的分布式版本
n_time = 365
px = 2000
lat = np.linspace(19., 4., px)
lon = np.linspace(34., 53., px)
标签: Dask
dask-distributed
我想知道Dask是否是处理视频文件的好工具。在本地,我使用OpenCV一次读取一帧并处理它。我认为,如果视频的每一帧都由单独的Dask任务处理,那将是理想的。另外,对于多个时间同步的视频文件,我想知道如何从每个摄像头一次读取一帧,并在一个任务中处理两个摄像头的帧
我不认为Dask支持生成器函数,但如果它真的支持,我认为这将是理想的。生成器任务将在任务图中一次生成一帧视频
下面是一个与我正在做的类似的示例脚本。它从多个视频文件中的每个文件中取出一帧,裁剪并调整每个文件的大小到标准大小,然后将这些文
标签: Dask
dask-delayeddask-dataframe
我想使用dask数据帧中的列创建一个dask系列,并将其添加到dask数据帧中。创建新系列的函数可以是任意复杂的,并且可能使用dask.dataframe没有的函数。我的想法是为该函数使用dask.delayed装饰器,并从单个delayed对象创建一个dask系列。然后,我遇到了一个问题:必须将这个新系列的分区/分区与原始dask数据帧对齐,以便将其作为列附加。我可以假设由该函数创建的序列始终具有与原始dask数据帧相同的长度和索引。所以我尝试了一些“肮脏”的伎俩来实现它。见下文:
impor
当使用Dask的分布式调度程序时,我有一个在远程工作者上运行的任务,我想停止该任务
我该怎么阻止它?我知道cancel方法,但是如果任务已经开始执行,那么它似乎不起作用。如果它还没有运行
如果任务尚未开始运行,则可以通过取消关联的未来任务来取消该任务
future = client.submit(func, *args) # start task
future.cancel() # cancel task
如果您使用的是dask集合,则可以使用cli
标签: Dask
dask-distributed
在下面的代码中,为什么dd.read_csv在集群上运行?
client.read_csv应在群集上运行
import dask.dataframe as dd
from dask.distributed import Client
client=Client('10.31.32.34:8786')
dd.read_csv('file.csv',blocksize=10e7)
dd.compute()
在这种情况下,一旦我创建了一个客户端对象,所有api调用都将在集群上运行吗?commnad
当工作人员加载并将其放入全局变量(如calib_data)时,我希望分配一个更大的对象(或从磁盘加载)。这对dask工作者有用吗?在这种情况下,客户端方法似乎可以做您想要做的事情。您仍然需要放置变量,因为python中没有真正的全局作用域。例如,某个地方可以是导入模块的任何属性,任何工作人员都可以访问该属性。您也可以将其添加为worker实例本身的属性,但我看不出有明显的理由要这样做
一种可行的方法是,劫持一个随机挑选的内置模块;但我并不特别推荐(见下文)
但是,在继续之前,您是否已经考虑过延迟
标签: Dask
dask-distributeddask-dataframedask-ml
我是达斯克的新手,所以如果这个问题对你来说很愚蠢,请原谅我。在Dask中,我使用的是一个数据量约为50GB的Dask数据帧。这些数据是字符串数据,我需要在将其交给机器学习算法(线程快速)之前对其进行预处理(进程快速)。现在的问题是,当我针对进程设计集群时,数据帧操作速度很快,但对于线程来说速度很慢(但是机器学习的线程速度很快)。因此,我正在寻找一个解决方案,在这个解决方案中,我们可以从进程切换到线程环境
目前,我正在使用process cluster保存预处理数据,然后关闭它,并使用线程化环境启
在Dask.distributed scheduler中使用Dask时,我的任务返回时出现KilledWorker异常。这些错误是什么意思?当Dask计划程序不再信任您的任务时,会生成此错误,因为当工作人员意外死亡时,此错误经常出现。它的设计目的是保护集群不受导致工作人员死亡的任务的影响,例如由于SEGFULTS或内存错误
每当某个工作进程意外死亡时,计划程序会记录该工作进程死亡时正在运行的任务。它会在其他员工身上重试这些任务,但也会将其标记为可疑。如果同一任务在多个工作线程死亡时出现,那么调度
标签: Dask
dask-distributed
是否可以将dask并行化封装在一个类中?在它的最终形式中,我的类在调用run之前将进行大量初始化-我将我的问题精简为框架问题。请注意,该代码适用于LocalCluster,并且类外部的分布式Calc也适用于同一HPC集群。下面是简要的代码以及相应的错误消息:
import numpy as np
from dask_jobqueue import PBSCluster
from dask.distributed import Client
from dask.distributed import
标签: Dask
dask-distributed
我在看Dask用户界面,试图找出每个字段的含义。但是,我无法理解下图中显示的写入字节和读取字节。此外,在某些情况下,写入字节数大于读取字节数。我找不到与此相关的任何文档。写字节和读字节到底意味着什么
我正在使用Joblib dask后端对MNIST数据运行一个简单的逻辑回归任务。
这些字段是关于进出该工作进程的网络流量的。这些是常用的计算术语,指通过网络接口写入的字节的测量,以及从网络接口读取的字节的测量
这是一种带宽利用率测量
一些工具(如Windows的资源监视器)将其称为“已发送”和“已接
我正在阅读估计器的实现(特别是dask_lightgb.core.py中的\u train\u part函数),我没有看到如何使用整个训练集来适应最终的估计器
\u train\u part函数接受布尔参数return\u model,在train函数的实现中(使用客户端.submit对每个工作人员调用\u train\u part),只有当工作人员是“主工作人员”时,return\u model才为真(其本身似乎是随机选择的Dask工作者)。从逻辑上讲,每个工作人员都会被分派整个模型训练集中的
标签: Dask
dask-distributed
我有一个dask数据帧,想计算一些独立的任务。有些任务比其他任务快,但我会在完成更长的任务后得到每个任务的结果
我创建了一个本地客户端,并使用Client.compute发送任务。然后我使用future.result来获得每个任务的结果
我使用线程同时请求结果,并测量每个结果的计算时间,如下所示:
def get_结果未来,i:
t0=时间
打印计算结果,i
result=future.result
printresult{}采用了{}.formati,time.time-t0
客户端=客户端
这几乎是这个问题的延伸
我有一个类似的挑战,除了我试图运行的代码分布在多个模块中,其中一些模块来自一个子目录。为了使我的代码合理有序,我有一个主要的“代码运行器”,它从“功能”文件夹加载不同的模块,以便执行一些数据处理步骤
我知道我可以将文件加载到Dask集群,但我如何才能做到这一点,以保持目录结构,并且我的导入仍然可以像“从要素导入要素1作为f1”(例如)
或者,如果我没有正确地考虑这一点,是否有更好的方法使用Dask实现这一点?人们通常使用Docker、NFS、conda pack或其他一些
标签: Dask
scikit-imagezarr
当调用另一个库到dask时,如scikit图像对比度拉伸,我意识到dask正在为每个块创建一个结果,存储在内存中或分别溢出到磁盘。然后它尝试合并所有结果。如果您在集群或单台计算机上,并且阵列的数据集很小,那么一切都可以得到很好的控制,这很好。当您使用比RAM或磁盘大得多的数据集时,问题就开始出现了。有没有办法缓解这一问题,或者使用zarr文件格式保存数据,以便在进行更新时更新值?也许这太离奇了。除了购买更多ram之外,其他任何想法都会有所帮助
编辑
我在看关于dask的文档,关于dask块大小
我正在尝试使用NERSC上的dask mpi客户端将一些相对较大(约15GB)的VTK文件加载到dask数据帧中。然而,我很难有效地实现这一点,因为大部分工作是以并行方式在NERSC节点上进行的
这里的基本结构遵循Rollin Thomas关于在NERSC()上使用dask的笔记本。因此,我:
在NERSC上推出Jupyter笔记本电脑
访问交互队列上的节点,定义调度程序文件
启动访问同一计划程序文件的客户端。此时,我可以启动客户端仪表板并观察流程
读取Jupyter节点上的VTK文件,并将其放
函数纯度的继承是如何工作的?例如,我希望此代码:
In [150]: myObj = delayed(dict,{}, pure=True)
In [151]: myObj
Out[151]: Delayed('dict-343cc4b6676839eb7fd74272cd0a1ed1')
In [152]: myObj = delayed(dict,{}, pure=True)
In [153]: myObj
Out[153]: Delayed('dict-343cc4b6676839e
我正在从delayed对象构建一个数据帧,该对象返回单个(pandas)数据帧,我向from\u delayed调用提供meta
碰巧延迟对象返回的列顺序与meta中提供的列顺序不匹配
在这种情况下,dask会变得混乱,并根据所要求的计算将两个排序之间的数据混淆
e、 g:
按照@mdurant的建议,可以强制订购:
ddf = ddf.map_partitions(lambda x: x[['date', 'ent', 'val', '(1)', '(2)']])
来自_delayed的结果
当我试图用下面的代码在一个大数据帧上删除重复的时间戳时,我得到了一个内存错误
import dask.dataframe as dd
path = f's3://{container_name}/*'
ddf = dd.read_parquet(path, storage_options=opts, engine='fastparquet')
ddf = ddf.reset_index().drop_duplicates(subset='timestamp_utc').set_index('t
我已经在dask阵列上成功地使用了几次map_块。我现在尝试部署一个numba函数来处理每个块,并处理和更改其中一个输入
numba函数接收2个numpy数组,并更新第二个数组。然后在return语句中返回,以使其可用于映射_块
该函数在numpy数组上运行良好,但python在从map_块调用它时崩溃。不作用于输入数组的numba函数行为正常(尽管在这种情况下很难让它们做任何有用的事情)。
这是已知的限制吗?虫子?我用错了吗?!
更新
我终于把它归结为一个可复制的例子,使用一个简单的numba
标签: Dask
google-cloud-dataprocdask-distributed
我有一个运行在谷歌云平台上的dataproc集群。我打算在dask客户机中传递这个集群,而不是初始化一个新的dask集群
但是,我不能直接使用我的dataproc集群
#Instead of :
cluster = YarnCluster(environment='environment.tar.gz',worker_vcores=2, worker_memory="8GiB")
cluster.scale(10)
client = Client(cluster)
#Directly usin
标签: Dask
svddask-distributed
首先感谢您为dask提供其所有功能,非常感谢
但是,使用dask.distributed在光栅化数据集上运行SVD时,如果只有单个数据块只包含NaN值,那么它似乎会失败,尽管数据集的大多数确实包含正确的值
我使用xarray.open_mfdataset(chunks={…})读取数据集,并尝试设置chunksize,以便包中使用的SVD计算(dask.array.linalg)通过使用dask.distributed客户端,利用集群提供的核心
尺寸:(时间:8760,x:1000,y:840
标签: Dask
dask-distributedcupy
我非常熟悉为CPU分发的Dask。我想探索在GPU内核上运行代码的过渡。当我向LocalCudCluster提交任务时,会出现以下错误:
ValueError: tuple is not allowed for map key
这是我的测试用例:
import cupy as cp
import numpy as np
from dask_cuda import LocalCUDACluster
from dask.distributed import Client
cluster = Lo
标签: Dask
dask-distributeddask-delayed
如果我启动一个包含N个工作进程的dask集群,然后使用cluster.compute提交N个以上的作业,dask是尝试同时运行所有作业(通过在每个工作进程上调度多个作业),还是作业排队并按顺序运行
我最近这样做的经验似乎暗示了后者。每个作业都非常占用内存,提交的作业比工作人员多,这会导致所有作业由于内存问题而崩溃
有没有办法强制dask一次只能在一个工作线程上运行一个作业,并对其他作业排队?默认行为由集群大小设置。如果辅助线程数大于4,dask将尝试猜测每个辅助线程中要使用的线程数。如果要更改此
我希望通过HTTP POST向REST API发出并行请求,该POST返回CSV(可选JSON)并将结果加载到数据帧中。有这种情况的例子吗?我找过了,运气不好。下面是我可以用来搜索的cURL命令示例:
https://someserver/search \
-d search="${1}" \
-d count=100 -d output=csv
对于每个被某个搜索谓词(比如state=“MD”)划分的请求,搜索字符串都需要不同。如果您阅读dask的
我正在尝试使用groupby过滤Dask数据帧
df = df.set_index('ngram');
sizes = df.groupby('ngram').size();
df = df[sizes > 15];
但是,df.head(15)抛出错误ValueError:并非所有分区都已知,无法对齐分区。请使用“设置索引”来设置索引。。尺寸上的分区不清楚:
>>> df.known_divisions
True
>>> sizes.known_di
此代码为我提供了最高的值和列名
import numpy as np
import pandas as pd
import dask.dataframe as dd
cols=[0,1,2,3,4]
df = pd.DataFrame(np.random.randn(1000, len(cols)), columns=cols)
ddf = dd.from_pandas(df, npartitions=4)
ddf['max_col'] = ddf[cols].idxmax(axis=1)
d
我理解这一点,它现在还没有处理,但它阻止了我对经过培训的OneHotEncoder/管道进行实时编码(比如在实时API服务中)
如果碰巧需要对以前没有见过的东西进行编码,人们如何解决需要对数据进行实时编码的问题
谢谢
我正在使用dask读取csv文件。但是,由于以下错误,我无法对其应用或计算任何操作:
您知道这个错误是怎么回事吗?如何修复它?在dask中读取csv文件时,如果无法识别正确的列数据类型,就会出现错误
例如,我们使用dask读取csv文件,如下所示:
import dask.dataframe as dd
df = dd.read_csv('\data\file.txt', sep='\t', header='infer')
这将提示上述错误
为了解决这个问题,@mrocklin在这个评论中建
我有一个dask数组,它的图形最终基于底部的一组numpy数组,并对它们应用元素操作。使用da.store计算数组并将结果存储回原始备份numpy数组是否安全,从而使整个过程成为就地操作
如果您认为“您使用的dask是错误的”,那么请参阅下面的详细版本,了解我为什么觉得有必要这样做
长版本
我将dask用于一个应用程序,其中原始数据来自内存中的numpy数组,该数组包含从科学仪器收集的数据。目标是用原始数据填充大部分RAM(比如75%+),这意味着没有足够的内存来创建内存中的副本。这使得它在语义
标签: Dask
dask-distributed
假设一个dask集群有一些CPU设备和一些GPU设备。每个设备运行一个dask工作进程。现在,问题是我如何发现dask工作程序的底层设备是CPU或GPU
例如:-如果dask工作程序在CPU设备上运行,我应该知道它在CPU上运行,或者如果dask工作程序在GPU设备上运行,我应该以编程方式知道设备类型。有什么方法可以通过编程来了解这一点吗?上面评论中的链接答案是关于提前按资源标记不同的工人,然后根据他们可能需要的资源分配任务
相反,也许您希望以异构的方式运行计算,也就是说,您不介意哪个任务可以在
标签: Dask
dask-distributed
我正在阅读关于dask.distributed的文档,看起来我可以通过client.submit()向分布式集群提交函数
我有一个现有函数some_func,它异步抓取单个文档(比如,文本文件),我想获取原始文档,抓取所有不包含元音的单词,并将其推回到另一个数据库中。此数据处理步骤是阻塞的
假设有数百万个文档,分布式集群只有10个节点和1个可用进程(即一次只能处理10个文档),dask.distributed将如何处理需要处理的文档流
下面是一些示例代码:
client = dask.distr
我试图从位于的dask ml文档中复制dask xgboost示例。不幸的是,Dask似乎没有完成培训,我很难找到错误和警告的含义。这是我的密码:
def main():
cluster = LocalCluster()
dask_client = Client(cluster)
x, y = make_classification(n_samples=1000, n_features=10, chunks=10, n_informative=4, random_sta
标签: Dask
dask-distributed
我有一个项目结构如下
- topmodule/
- childmodule1/
- my_func1.py
- childmodule2/
- my_func2.py
- common.py
- __init__.py
client = Client(YarnCluster())
client.submit(MyFuncClass1.execute)
在Dask集群边缘节点上的Jupyter笔记本中,我正在执行以下操作
from topm
标签: Dask
dask-distributed
这有可能吗?您不应该在同一个Python会话中创建多个客户端。也许有必要更深入地了解为什么您要多次给客户打电话
如果已经有一个Dask群集在默认地址上运行,则可以设置Dask_调度程序_地址环境变量,该变量将指示客户端在那里查看,而不是创建本地群集
from dask.distributed import Client
Client()
Client(do_not_spawn_new_if_default_address_in_use=True) # should not spawn a n
使用Pycharm社区2018.1.4
Python 3.6
Dask 2.8.1
尝试在我的一些方法上实现dask延迟,但出现错误
AttributeError: module 'dask' has no attribute 'delayed'.
这显然不是真的,所以我想知道我做错了什么。我的实施结构如下:
进口达斯克
def总管
对于enumaratefilenames中的i,fn:
数据={}
对于0,2范围内的x:
data.updatedask.delayedload_datafn,
我有以下格式的Dask数据帧:
date hour device param value
20190701 21 dev_01 att_1 0.000000
20190718 22 dev_01 att_2 20.000000
20190718 22 dev_01 att_3 18.611111
20190701 21 dev_01 att_4 18.706083
20190718 22 dev_01 att
我的用例是,我希望将文件路径或过滤器作为xcom传递给Airflow中的任务,以便我的下一个任务可以读取刚刚处理的数据
任务A将表写入分区数据集,并生成大量拼花文件片段-->任务B稍后将这些片段作为数据集读取。不过,我只需要读取相关数据,而不需要读取可能有数百万行的整个数据集
我测试了两种方法:
在我完成写入数据集后立即列出修改过的文件。这将为我提供一个路径列表,我可以在下一个任务中调用ds.dataset(路径)。我可以在这些路径上使用partitioning.parse(),或者检查片段以获
假设我有一个函数,它执行一些处理并将结果存储到redis服务器
r = redis.StrictRedis()
def process(data):
(...do some work...)
r.put(...)
现在我有了大量的数据,我想使用dask来并行化这个过程。类似于
from dask.distributed imoprt Client
client = Client()
for x in data:
client.submit(process,x)
但是
标签: Dask
dask-distributed
我有两个(或更多)并行文本文件存储在S3中-即第一个文件中的第1行对应于第二个文件中的第1行等。我想将这些文件作为列读取到单个dask数据帧中。做这件事最好/最简单/最快的方法是什么
另外,我可以将它们读入一个单独的数据帧,但是我不能在索引上连接它们,因为数据帧索引值似乎既不唯一也不单调。同时,行的对应关系由它们在每个文件中的位置来定义。不幸的是,dask.dataframe按字节而不是按行分解大型文件。如果不先通读大文件中的某一行,则很难找到该行。对,是否可以生成全局单调(或至少唯一)索引?我
我试图确定当使用from_sequence函数实例化时,dask.bag是否保留顺序
我从其他问题中了解到,订购包(例如)不保证,但我不清楚原因。
这是否意味着“现在它通常是为列表保留的,但是我们可能不得不更改它,因为稍后内部会有一些内容”
除了对复杂的数据结构进行测试外,我还运行了下面的简单测试。在这两种情况下,排序似乎都得到了保留。只是不确定我是否需要做更多的工作,例如,使用zip获取表单的元组列表(索引、对象),并将我的bag从中取出,这样我就可以在应用compute后按索引排序
来自da
我目前正在尝试为凸矩阵优化实现Dask。目标是对内存不足的矩阵执行矩阵优化(因子分解)。以一个高矩阵作为输入,两个高矩阵作为输出,再加上一些参数(如收敛性等),我使用dask数组将原始矩阵和itermediate/output矩阵分块。最后,迭代是连续的,即前一次迭代的输出被用作新迭代的输入(参见下面的简单示例)
在每次迭代中,必须计算并检查两个标准的收敛性(最终if语句)。但是,如果我执行下面给出的代码,我看到的是Dask计算标准(由if语句强制执行),但在每次迭代中重新计算其他矩阵:即迭代1
标签: Dask
python-xarrayzarr
全部。我正在使用一个Dask分布式集群在一个循环中写入Zarr+Dask支持的Xarray数据集,dataset.to_Zarr正在阻塞。当存在阻碍循环继续的散乱块时,这确实会减慢速度。有没有一种方法可以异步执行.to_zarr,这样循环就可以继续进行下一次数据集写入,而不会被几个散乱的数据块阻塞 使用分布式调度程序,无需任何特殊努力即可获得异步行为。例如,如果您正在执行arr.to_zarr,那么实际上您将等待完成。但是,您可以执行以下操作:
client = Client(...)
out
1 2 3 4 5 6 ...
下一页 最后一页 共 11 页