我正在阅读并在那里找到了关于窗口操作符的以下部分:
“第一个复杂的重载允许我们控制窗口何时关闭每次创建窗口时都会调用windowClosingSelector函数。在订阅时以及窗口关闭后立即创建窗口;当windowClosingSelector中的序列产生值时,窗口关闭。忽略该值,因此序列值的类型无关紧要;实际上,您只需完成windowClosingSelector中的序列即可关闭窗口
这似乎与公共最终可观察窗口不同(Func0正如我所建议的,我在Github页面上提出了这个问题,并且已经解决了
我的用例是视图组中的单击侦听器:
每个视图发布一个可观察的冷API
视图组管理一组视图,并提供从子视图发出单击的可观察的API
我希望每个子视图在添加到复合可观察对象时都被订阅,但前提是复合对象有订阅。如果没有,则应仅在订阅组合时订阅子项
到目前为止,我的解决方案是:
PublishSubject<ViewClick> composite;
Map<View, Subscription> subscriptions;
void addSource(View view
我正在用RxJava观察一个数据库表。所有SQL SELECT查询都返回一个热可观测值,每当我在同一个表中插入/更新/删除行时,我都会重新运行任何已订阅的SELECT查询,并在热可观测值查询上调用onNext(queryResult)
database.select("SELECT * FROM Foo) // returns an observable
.map(doSomething)
.filter(aFilter)
.subscribe (
我的用例是使用Snackbar在Android应用程序中显示通知。我一次只能显示一个通知,因此我使用onBackpressureBuffer,并且仅在Snackbar关闭时请求下一项。这工作得很好,但是如果缓冲区溢出,我希望有一个冷却期,这样我们就不会有恒定的通知流。换句话说,当缓冲区溢出时,跳过新的排放一段时间
到目前为止,我有类似的东西
observable
.onBackpressureBuffer(
bufferSize,
() -> { /
这是一个非常简单的示例:
public static void main(String[] args) {
Observable.from(ListUtils.asList(1, 2, 3)).retry(3).subscribe(new Observer<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted"
我想尝试一下非常简单的RxJava代码,如下所示
int[] test = {1,2,3,4};
Observable<Integer> findAverage = Observable.fromArray(test);
averageInteger(findAverage).subscribe(System.out::println);
int[]test={1,2,3,4};
可观测的findAverage=可观测的数组(测试);
averageInteger(find
标签: Rx Java
reactive-programmingside-effects
我有一个问题,它是简单的业务逻辑流程:
检查多个部门的员工、部门和员工关系是否在缓存中,首先检查缓存中是否存在这些关系,如果存在,检查是否存在
员工属于它,如果不在缓存中,则从数据库中获取,并检查员工的关系,然后将部门信息保存到缓存中
代码如下:
public Observable isEmployeeInDepartment(List<Long> departmentIds, long employeeId){
//this observable will resol
标签: Rx Java
observer-patternreactivex
我开始学习ReactiveX,但无法理解RESTAPI或任何没有关联UI的API的优点
例如,我正在构建一个Spring Boot REST API,该API由托管在不同机器上的Angular webapp(以及最终的其他应用)使用。每当我从webapp发送请求时,我都会使用承诺来处理结果,而不会阻塞
如果是这样,在Spring引导应用程序中使用ReactiveX(在这种情况下是RxJava)有意义吗?它会带来什么好处?使用ReactiveX,您可以同时异步执行不同的调用,当所有这些调用完成后,
我有一个“外部”观测值和一个“内部”观测值的缓存。我选择哪一个“内部”可观察对象取决于“外部”可观察对象的输出。当它们中的任何一个发生变化时,我希望结合它们的结果。如果“外部”或“内部”更改,我希望合并新值。我尝试使用flatMap执行此操作,但遇到了触发太多事件(即重复事件)的问题
这个输出类似于
00:17:36 Outer(0):Inner(zero:0)
00:17:37 Outer(0):Inner(zero:1)
00:17:38 Outer(0):Inner(zero:2)
00:
标签: Rx Java
rx-java2rx-kotlinrx-kotlin2
可能是一个新来的问题
如果我有两个rx流,它们有一些公共部分,是否有可能提取和重用它们
以下是伪代码示例:
someManager.getInfo(id) returns a Single<SometInfo>
someManager.getInfo(id)返回一个
这个游戏需要一个id,并且应该只返回一个带有状态的单张
fun getStatus(id: String): Single<Status> =
someManager.getInfo(id)
使用“observeOn(Scheduler)”,rxjavaobserveable会发现,当项目从不同的线程发出时,有时会遗漏一些项目
这是密码
val subject = PublishSubject.create<String>()
subject
.observeOn(Schedulers.io())
.subscribe { x: String? -> println(x) }
val t1 = Thread { subject.onNext(&q
我正在阅读一本书,这本书讲述了如何在遗留块代码中引入RxJava
它基本上是这样做的:
Observalbe.defer(() -> {return Observable.from(someBlockingMethodTofetchDBRecords(....))});
它说延迟会在节目中引入懒惰。但是默认情况下是不可见的。Observable.from应该返回一个Observable,在我们订阅它之前它不会做任何事情。那么为什么我们需要在这里延迟呢?因为一些用于蚀刻数据库记录的阻塞方法
标签: Rx Java
reactive-programmingrx-java2
考虑以下几点
类虚拟{
私人娱乐第一步():单人{
返回单身。只是(“大佛”)
.subscribeOn(Schedulers.io())
}
乐趣行动1(){
第一步()
.地图{
地图素材(it)
}
杜恩先生{
println(${Thread.currentThread().name}it)
}
杜恩塞斯先生{
行动(it)
}
.flatMap{
return@flatMap另一步(it)
}
.observeOn(Schedulers.trampoline())
.subscribe
我是RxJava新手,但我正在将其集成到一个项目中,以帮助我学习它。我遇到了一个关于最佳实践的问题
我有一个问题,关于如何处理onError以防止可观察的处理停止
以下是设置:
我有一个用户名列表,每个用户名我都想做2个或更多的网络请求。如果任何对userid的网络请求失败,那么该userid将不会被更新,并且可以被跳过。这不应阻止处理其他用户标识。我确实有一个解决方案,但它涉及嵌套的订阅(参见第二段代码)。我确实看到的一个问题是,如果每次呼叫都失败,即使在检测到某个阈值号码失败后,也无法短路并
考虑以下用例:
需要尽快交付第一件物品
需要以1秒超时消除以下事件的抖动
我最终实现了基于操作符或随时间变化的自定义操作符,然后像这样使用它
.lift(new CustomOperatorDebounceWithTime<>(1, TimeUnit.SECONDS, Schedulers.computation()))
.lift(新的自定义运算符debouncwithtime(1,TimeUnit.SECONDS,Schedulers.computation())
Cus
我有以下用例(这当然是一个人为的例子,但一旦我知道答案,我将能够将其移植到我要解决的实际问题):
获取整数列表
根据%4操作的结果对它们进行分组
将每个组的元素收集到列表中
忽略元素少于3个的任何组/列表
发出一个列表,其元素为步骤3中创建的列表
这是我目前的代码:
可观察
.from(数组.asList(0,1,2,3,4,5,6,7,8,10,11,12))
.groupBy(项目->项目%4)
.订阅(groupedObservable->{
群可观测
托利斯先生()
.filter(li
subscribe(newobserver())和subscribe(newsubscriber())之间有什么区别?
我应该在什么上下文中使用哪一个?要回答您的问题,您应该深入了解这两个函数的代码,以及
简而言之,VersionWithObserver首先检查传递的参数是否为订阅服务器的子类型,如果是,则调用第二个版本,否则它只将传递的观察器包装在订阅服务器中并调用第二个版本
鉴于第一种方法基本上是包装器,除非您有任何理由使用Observer而不是Subscriber,否则您应该使用后者。还要
正在寻找一种干净的方法,将源可观察的转换为在一段时间内不发射项目后发射单个null(或sentinel值)
例如,如果源可观测物发射1,2,3,然后在发射4,5,6之前停止发射10秒,我希望发射的项目为1,2,3,null,4,5,6
该用例用于在UI中显示值,其中显示的值应变为破折号-或N/a(如果最后发出的值过时/旧)
我查看了timeout操作符,但是当超时发生时,它会终止可观察的,这是不需要的
使用RxJava。您可以通过稍微复杂的发布amb计时器设置来实现这一点:
PublishSub
根据我的观点。我认为share()与publish().autoConnect()是一样的。但是在这段代码中,结果是不一样的
Observable<Integer> cold = Observable.create(subscriber -> {
for (int i = 0; i <= 2; i++) {
System.out.println("Hot Observable Emit " + i);
sub
标签: Rx Java
observableretrofit2rx-android
我有一个可变长度的可观测阵列。我希望压缩请求(即,生成一堆API请求并等待它们全部完成),但我不知道如何实现zip函数
Observable.zip(observables, new FuncN<List<ResponseBody>>() {
@Override
public List<ResponseBody> call(Object... args) {
return Arrays.asList(args); <- c
标签: Rx Java
reactive-programmingrx-java2backpressure
我一直在为我认为是一个相当基本的问题而挣扎
我有一个Flowable,它从网络中检索一组项目并将其发送出去
Flowable
.create(new FlowableOnSubscribe<Item>() {
@Override
public void subscribe(FlowableEmitter<Item> emitter) throws Exception {
Item item = retriev
如果有一个可观察的列表,例如:
Observable<Msg> obs1 = getObs1();
Observable<Msg> obs2 = getObs2();
List<Observable<Msg>> listOfObs = new ArrayList<Observable<Msg>>();
listOfObs.add(obs1);
listOfObs.add(obs2);
Observabke allObs
标签: Rx Java
reactive-programmingrx-java2reactivex
我正在编写一个与远程服务器对话的REST客户端,需要执行类似的操作:
从远程服务器检索项目列表
将收到的列表与本地数据进行比较
对于任何已在本地修改的项目(可能没有),请将更改提交到服务器
最后,将项目的完整列表返回给调用者
我正在使用RxJava(2),在实现上有点困难。我认为应该大致如下(伪代码):
公共单同步()
{
return server.fetchItems().flatMap(receivedItemList->{
List syncList=新列表();
用于(项目:rec
我有一项活动,我正在做以下工作
fun getJust(): Observable<String> {
Log.e("TEST", "getting just")
return Observable.just("ABC")
}
private fun stream(): Observable<String> {
return getJust()
.doOnSubscribe
我有一张单人票和一张流动票。我想用Flowable的第一个值和Single的值做一些操作。我用什么接线员
如果这是两个可观察的或两个可流动的,我会使用combinelatetest,但我在Single或Maybe上找不到类似的运算符。好的,答案是zipWith,它对Single/Maybe进行组合,Single.toObservable()或Single.toFlowable()?这是最后的手段。最好是一个运算符您将只有一个排放,这就是您要寻找的?是的,在问题中,我指定要将单个值与第一个Flow
我想收集集合中发出的每个项目。我是这样做的:
List found = new ArrayList();
controller.search(query)
.doOnCompleted(new Action0() {
@Override
public void call() {
//Do something with "found" list
}
}).su
标签: Rx Java
reactive-programmingobservablereactivexrx-scala
假设我有两个可观测的排序双精度。我想得到它们之间的差异,作为一个可观察的。例如:
1 2 4
left: ──o───────o───────────o────/
1 3 4 5
right: ──o────o──o───o───/
l2 r3 r5
output: ──────────o───────────o────o
标签: Rx Java
observablesubscriberx-java2reactive
在rxJava 1.x中,可以执行以下操作:
Observable.<Foo>create(subscriber -> {
...
getNewObservable().subscribe(Subscribers.wrap(subscriber));
})
Observable.<Foo>create((ObservableEmitter<Foo> emitter) -> {
...
getNewObservabl
我正在努力研究如何用RxJava表达以下内容。假设我有一个要在网络上传输的数据包流列表:
PublishSubject<List<Packet>> packetsSubject = ...;
PublishSubject packetsObject=。。。;
我有以下传输功能:
public Observable<Status> transmit(Packet p) {...}
公共可观测传输(包p){…}
只要返回的数据包状态为status,我就要发
一些RxJava方法(如just)会导致为varargs参数警告创建未经检查的泛型数组
为什么不使用@SafeVarargs来解决问题?SafeVarargs是Java 7的一项功能,由于Android的原因,RxJava必须以Java 6为目标。谢谢您的回答!但我仍然有疑问,是因为@SafeVarargs是在Android API19中添加的吗?我们支持Android 2.3+,即API级别9+:谢谢,我理解。
我正在使用RxJava CombineTest和两个可观测值。
我的问题是,如果其中一个可见光以onError结尾,联合收割机会调用onNext吗
文档显示了一个令人高兴的场景,当两个观测值都正确地发射项目时,但我找不到在错误情况下会发生什么
|----onError----可观察到的1
|-----------O-------O-------可观察到的2
|--------------?--------------CombineRelateTest-will onNext将被称为?总合同是这样
与我想要的最接近,combinelateest函数执行以下操作:
但我需要这个:
我查看了RxJava的几乎所有功能,但没有发现任何合适的功能。您可以在开始时添加“empty like”元素,因此“1”+=“1”:
感谢@zella和@Gustavo的解决方案,这几乎是我所需要的,但在他们的情况下,组合源仅在其中一个源发生时发射。下面的代码片段正是我想要的:
Observable.combineLatest(
source1.startWith(""),
source2.st
标签: Rx Java
reactive-programmingrx-java2vert.xrx-java3
我从文件中读取内容,我使用新行分隔符将一些名称放入文件中,但在通过vertx文件系统读取文件时,我无法提供所需的过滤器。每次打印文件的所有数据
以下是代码片段:-
vertx.fileSystem().open("data.txt", new OpenOptions(), handler -> {
final AsyncFile asyncFile = handler.result();
final Obs
我有一个Utils.send方法,它通过HTTP将JsonObject请求发送到远程目标,并接收一个结果,该结果作为Maybe返回给调用者。
它可以按如下方式使用:
Maybe<JsonObject> response = Utils.send(jsonObjectRequest);
Maybe response=Utils.send(jsonObjectRequest);
此响应可能为空(Maybe.empty())
我想将此方法包装到我的自定义方法中,如果结果为空,该方法将重
我知道您应该不惜一切代价避免这种情况,但如果我在RxJava中有一个有效的子类Observable用例呢?可能吗?我怎么做呢
在本例中,我有一个“repository”类,它当前返回请求:
class Request<T> {
public abstract Object key();
public abstract Observable<T> asObservable();
[...]
public Request<T>
我有这样的想法:
return Observable.timer(2, TimeUnit.SECONDS)
.flatMap(integer -> captureList.getLatestCaptureCut()) #1
.flatMap(vp::processVideo) #2
.observeOn(AndroidSchedulers.mainThread())
文档非常清楚可观察的替代方案的作用:
单类实现了单值响应的反应模式。有关值流或向量的反应模式的实现,请参见Observable。
Single的行为与Observable相同,只是它只能发出一个成功值或一个错误(与Observable不同,没有“onComplete”通知)
就像一个可观察的对象一样,单个对象是懒惰的,可以是“热”或“冷”,同步或异步的
但我看不出有什么原因,它可能比可观察的更简单,有时,在哪里以及为什么要使用它?在某些情况下,可观察的只返回单个项目和结束(或错误)是有意义的。简
有一个从其受体收集数据作为响应的流
所以假设我有n个受体,在接收到的帧数据中,我能够知道它是从哪个受体索引接收的,并执行一个过滤器,为每个受体创建一个新的源,如下所示
-------a1---b1---c1---a2---a3---c2---b2-
-------a1---------a2------a3-------------
---------------b1-------------------b2-
----------------------c1-------c2---------
标签: Rx Java
reactive-programmingrx-androidreactivex
我正在使用RxJava。我必须每5秒钟执行一项特定任务。通过使用“可观察的间隔”方法,这一点非常有效。
但是,我也有以下限制:如果上一个任务没有完成,则不能执行新任务。在这种情况下,新任务只需要在最后一个任务完成时执行
我不知道如何简单地用RxJava实现这一点
所有的想法都将非常感激^^
感谢阅读。当您调用interval时,将选择一个与给定计划程序中的单个线程相对应的工作线程(默认值为计算):
或者,如果任务未返回任何内容:
Observable
.interval(5, TimeUn
标签: Rx Java
observableretrofit2rx-java2
我正在使用rxjava2、rxjava2和适配器rxjava来实现我的http api调用
//API definition
Observable<String> queryProducts(@Body Query query);
//API implementation.
serviceApi.queryProducts(query)
.subscribeOn(new Scheduler().ioThread())
我是RxJava新手,不知道如何使用ConnectableObservable实现可重复轮询,2个订阅者在不同的线程上处理事件
我有一条大致如下的管道:
我想在延迟之后,以类似于解决方案的方式重复整个管道
或者
这在普通(不可连接)可观察的情况下可以正常工作,但在多播情况下则不行
代码示例:
工作
final int[]i={0};
可观测积分可观测=
Observable.defer(()->Observable.fromArray(i[0]++,i[0]++,i[0]++));
积分可观测
标签: Rx Java
reactive-programmingrx-java2
我仍在学习reactive/rxjava的一些细节,并对我经常遇到的一个用例提出了一个快速的问题
我们通常有一些服务从数据库或其他数据源获取条目,并通过HTTP控制器将其返回给其他服务或客户机。我的问题是这样的:将此数据公开为可流动的与可能的(或单独的,如果合适)之间有什么实际区别吗。所有数据将从数据源一次加载,因此它将位于本地列表中
我假设如果有一个DB调用返回大量值的用例,那么通过Flowable公开它们可能是一个更好的选择,以便调用方能够以自己的速度处理它们?但如果只是一小部分记录,我想这
我希望在成功重试后收到成功消息。以下是我的最小工作示例:
var i = 0
Observable.just(i)
.flatMapSingle {
println(i)
i++
when {
it < 3 -> Single.error(Exception())
我需要使用一个可观察对象执行一个任务,获取发出的数据,然后使用另一个可观察对象执行另一个查询。
通常,我会使用flatmap来链接两个观察值,并只关心发出的最后一段数据。
现在需要在这个过程中更新视图。比如说,当第一个可观察到的数据发出时,我应该在UI上显示一行文本
我知道我可以将它分解为两次调用,调用第一次调用的onNext()中的第二个可观察对象,但这很麻烦
不走那条路,我怎么能实现我上面描述的?
感谢以下是纽约老项目的一个例子:
注意.doOnNext(l->VideoActivity.t
我有一个应用程序,用户可以根据他们订阅的主题接收通知。工作流程如下所示:
用户登录
应用程序将注册到通知服务器
用户选择订阅/取消订阅各种主题
我希望所有的网络请求都被序列化。如果我在初始化时知道要订阅/取消订阅的确切主题,我可以编写如下流:
loginObservable.subscribeOn(Schedulers.io())
.flatMap(user -> registerApp(appId))
.flatMap(o ->
我正在学习RxJava 2,在重构代码以使用Single而不是observeable之后,我发现Single并没有实现doafterminate()
在我看来,这是一个疏忽,因为所有可流动的、可完成的和可能的类型似乎都实现了doafterminate()
提供了什么?事实上,由于缺乏对该操作员的需求,该操作员已丢失。PR#5093添加了它,并将成为本周三到期的RxJava 2.0.6的一部分
到那时为止,最接近的解决办法是doFinally:
Single.just(1).doFinally((
我需要把随机发生器写成可观测的。观察者将在很长时间内对下一个值进行处理。所以我只想在观察者需要的时候发出新的随机数,或者,可能的话,两个额外的数,但不要更多
怎么做
注意:如果有必要,我使用Java。您可以使用fromCallable和repeat来获取新值,只要消费者有需求
例如:
Random random = new Random();
Observable.fromCallable(() -> random.nextDouble())
.repeat()
.map(rnd ->
我正在使用VertxRxJava订阅服务器使用RESTAPI。我想在出现特定错误时重试请求,下面是我的代码。
当我运行这个程序时,重试会无限地进行,我宁愿让它重试三次
我是RxJava新手,仍然掌握着用Rx方式做事的诀窍。
有人能告诉我是否遗漏了什么吗?谢谢
responseSingle.map(response -> {
//do some stuff
return responseBody;
}).retryWhen(e
我正在rxJava2中寻找一种方法,在执行操作后,我可以再次调用订户。让我具体描述一下我的意图
我使用改型2进行联网,但这并不重要。这个问题是关于rxJava的。我想在刷新令牌后重试api调用。我需要设置的是,每次进行任何api调用时,令牌都可能过期,我有责任调用另一个api调用来刷新令牌。在代币之后
刷新后,我需要再次调用具有过期令牌的相同api调用。因此,对于用户来说,它可以无缝地刷新令牌
以下是我用于所有网络呼叫的默认订阅:
public class DefaultSubscriber&l
我正在Android上的可编辑文本字段中使用Rx。我想检索订阅方法中的原始用户输入
到目前为止,我一直使用它来保留用户输入。我想使用switchMap来避免处理旧结果的结果。如何将此代码迁移到switchMap:
editText.textChanges().map { it.toString().trim() }.flatMap(
{
Observable.just("Data: $it")
},
1 2 3 4 5 6 ...
下一页 最后一页 共 13 页