如果我对已死亡的ActorRef调用context.watch(),是否保证仍会收到终止消息
另外,在收到关于特定参与者的终止消息后,是否仍需要调用unwatch()
另外,watch()调用引用是否已计算在内?如果我调用watch()两次,然后再调用unwatch()一次,是否保证仍会收到终止消息?我认为文档非常清楚:
“一个重要属性是消息将被传递
无论监控请求和目标的
终止发生,即即使在此时,您仍会收到消息
目标已经死了。”
而且,你不需要取消对演员的惩罚,因为演员不能死两次,而且也不
标签: Akka
scala-2.10playframework-2.2
我有一个计数器“numberOrders”,我想每天午夜重置它,以了解我一天收到多少订单,我现在拥有的是:
val system = akka.actor.ActorSystem("system")
system.scheduler.schedule(86400000 milliseconds, 0 milliseconds){(numberOrders = 0)}
这段代码在def中,每次我收到新订单时都会调用def,所以我想它做的是:在第一个订单或每个订单24小时后重置numberOrde
我刚从Akka开始,已经创建了一个测试应用程序。在其中,我创建了一群参与者,他们创建了一个调度器来生成心跳事件。对于另一种类型的事件,我使用heartbeat.cancel()取消调度程序,但我希望在发生其他事件时重新启动它。如果我重新创建调度程序,我会看到内存消耗不断增加
接下来的问题是如何恢复调度程序,或者如何正确地处理调度程序
这是那个演员的密码
public class Device extends UntypedActor {
enum CommunicationStatus
我的系统运行Play framework 2.5和Akka内置。我相信初始化路由器有两种方法
1:
2:
所以我有两个问题:
它们不同吗
哪条路更好
非常感谢你 不同之处在于,当前版本的Akka(2.4.x)支持1号,而2号已经停止使用
RoundRobinRouter是Akka 2.2.x的一部分,在Akka 2.3.x(请参阅)中首先被弃用,然后在Akka 2.4.x中被删除
ActorRef router = akkaService.getActorSystem()
.actorO
让我们考虑一个简化的例子
def routes: Route = {
pathPrefix("parent" / IntNumber) { (id) =>
get {
complete(id)
} ~
pathPrefix("child") { // this would be in separate file
get {
complete(s"parent/$id/child")
}
}
}
}
当子参与者收到自定义重新启动消息时,参与者应自行重新启动。
(目的是重置参与者成员变量,从db重新加载外部状态,但不清除参与者内部消息队列)
要实现重新启动,一个解决方法是子参与者抛出自定义异常,父参与者将其OneForOneStrategy配置为为此特定异常类型重新启动子参与者
我想知道,是否有更直接的方法来重新启动
其目的是重置参与者成员变量,从db重新加载外部状态
我想,这可能是最大的问题,因为加载外部状态可能需要时间,并且会阻塞操作,因此操作的结果是或应该是Future[]-因此,在将来
我正在使用spray can客户端编写一个http服务客户端库,它使用akka io
为了让客户端正常工作,我需要设置一些非默认的spray.can.configconfig选项(我需要设置respone chunk aggregation limit=0以正确处理大块响应)
我希望1)以某种方式将此设置捆绑到客户端,这样库的用户就不必进行任何显式配置,2)使其仅适用于我的客户端库使用spray client的情况,而不是在客户端库的用户在其他地方使用spray client的情况下不经意地错误
标签: Akka
java.util.concurrentakka-remote-actor
最近,我在我的项目中实现了一个远程参与者模式。我想知道哪条路是同时运行的演员
路1
for(int i = 0; i < 100; i++){
Patterns.ask(Props.create(Worker.class),$someJobs, timeout);
}
for(int i=0;i
我是Akka的新手,我们正在为多个事务构建一个健壮的系统,可以扩展到每分钟5000个请求
调度器将列表或请求推送到Akka主参与者
Akka的主要参与者有子组件的引用,在子组件上说“询问”,并等待未来,子组件与“webservice适配器”交互,适配器在其中调用Web服务
在“webservice适配器”中,我们存储对数据库的请求和响应
一旦响应可用,适配器将结果返回给Subactor,然后返回给Future,以便在Actor中进行进一步处理
问题是:
由于我们在主参与者中使用“Ask”来调用子
我正在测试Akka,在application.conf中有这个配置
akka.actor.deployment {
/OperationManagerActor/StatusCheckerActor {
router = smallest-mailbox-pool
resizer {
lower-bound = 1
upper-bound = 10
messages-per-resiz
我正在使用Spray/Akka创建一个简单的restapi,以接收json消息并将其传递给apachekafka生产者。ApacheKafka生产者是一个非阻塞API,用于向Kafka消息代理发送消息,并且是线程安全的,应该由所有线程共享
我的基本架构是路由特性中的以下伪代码
val myKafkaProducerActor = system.actorOf(Props[KafkaProducerActor])
val route = {
path("message") {
get
在我阅读了AKKA团队编写的这篇优秀的博客之后,我运行了代码,它确实有效
但当我做另一个实验时,稍微改变一下,在工人中抛出一个异常,那么这个模式就不起作用了。因为工人在工作中可能会抛出任何类型的异常是合理的,对吗
以下是我的代码,两个文件:
Reaper.scala,摘自上述文章:
import akka.actor.{Actor, ActorRef, Terminated}
import scala.collection.mutable.ArrayBuffer
object Reaper {
有人能帮我为下面这个抽象的Akka actor伪代码编写一个测试用例吗
我面临的问题是,所有的监控策略消息(作为测试用例的一部分发送)都被父参与者使用,而不是将其监控策略应用于子参与者
父抽象参与者创建子对象
Abstract class Parent extends UntypedActor {
String name;
int noOfChildActors;
//this set is used to manage the children i.e noOfChildActo
我听到了很多关于阿克卡的好消息。
我有一个价格流,我需要创建两个移动平均价格。
最后,我有一个策略,它会倾听这些信号=>价格,移动平均价格1和移动平均价格2,并做出一些决定
我通过actors实现了这一点,但不幸的是,我有一个顺序问题,即a)我不知道如何为生成的消息或事件指定时间戳或优先级顺序
final ActorSystem actorSystem = ActorSystem.create("backtest");
final ActorRef strat = ActorSystem
标签: Akka
akka.netakka-cluster
我正在研究域驱动设计,想知道作为聚合的参与者的实例是否可以同时存在于全球网络/集群的两个地方,比如在美国和欧洲,这样在这两个地方都有工作人员的客户就不会经历任何网络延迟?从概念上讲,一个“实体”可以有两个实例在您的集群中。然而,另一个问题是如何将它们彼此同步以保持实体状态的一致性?因此,两个参与者在数据库中的同一行上使用相同的akka://address在欧洲有一名演员,美国有一名演员,而只有一排演员的情况下,“是不是不可以”,无论如何,您都会遇到延迟,因为这两个参与者必须在某个时候与该行通信。
标签: Akka
akka-streamakka.net
由于不推荐使用ActorPublisher而支持GraphStage,看起来我不得不放弃actor管理的状态,而选择GraphStateLogic管理的状态。但使用actor管理的状态,我能够通过向我的actor发送任意消息来改变状态,而使用GraphStateLogic,我不知道如何做到这一点
因此,在前面,如果我想创建一个源来公开通过HTTP请求/响应提供的数据,那么使用ActorPublisher,请求通过请求消息传递给我的参与者,我可以通过在后台启动HTTP请求做出反应,并向我的参与者发
标签: Akka
akka-streamakka-http
根据定义,http响应分为3部分,状态代码->标题->正文,当执行akka客户端http请求时,在前两部分完成后接收http响应
收到
val responseFuture: Future[HttpResponse]
responseFuture.map {
case HttpResponse(statusCode:StatusCode, headers:Seq[HttpHeader], entity:ResponseEntity, protocol:HttpProtocol)
标签: Akka
akka-clusterakka-typed
我有akka集群和非类型的演员系统。
现在我必须将一个节点更改为使用类型化的参与者
问题是:如何从非类型化的actor系统创建类型化的shardregion?
基本上我需要这样的smth:
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
ClusterSharding(untypedActorSyste).sharding.spawn(
behavior = entityId ⇒ counter(ent
我有下一个代码:
//TestActor got some message
class TestActor extends Actor {
def receive = {
case string: String => //....
}
}
//TestReg when create get ActorRef, when i call `pass` method, then should pass text to ActorRef
class TestReg(val
我是阿克卡的新手。我正在尝试Akka Java文档中的第一个集群示例。代码如下:
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent.ClusterDomainEvent;
public cla
报告说:
当目标参与者终止时,代理也将终止
我对以下情况感到困惑:
远程机器崩溃了
由于网络问题,无法访问远程节点
在这些情况下会发生什么?代理是否仅在目标正确关闭并且能够发出终止消息时终止?如果代理未在这些条件下终止,那么使用可靠代理的系统如何从网络或远程机器故障中恢复?从Akka 2.2开始,可靠代理将在您描述的条件下自动停止。ReliableProxy监视隧道,并在接收到隧道的终止后自行停止
在Akka 2.3中,我提供了一个whereReliableProxy将在目标终止后尝试重新连接到
这更像是一个最佳实践问题。虽然它确实会影响性能
在一个应用程序或actor系统中应该有多少调度器?
例如,一个调度器用于消费者,一个用于生产者,一个用于管理,这样可以吗。。。?还是我应该把调度员的数量控制在最低限度
编辑:
如何阻止I/O操作—例如从套接字读取?
执行此操作的参与者是否应该有一个单独的调度程序?如果您没有阻塞(托管或非托管)线程,那么一个调度程序就可以了
否则不要执行阻塞:)。如果确实需要,请在单独的调度程序中执行阻塞。我从不阻塞。但我的想法是:如果我想分别为工人参与者和管理参与
是否有可能限制Akka参与者(包括其子女)直接与系统内除家长(或其他定义的参与者)以外的其他参与者进行交流
我正在寻找某种形式的访问或可见性控制。为了使用OOP术语,我想让一些参与者成为私有的或受包保护的。我目前的理解是,任何参与者都可以向参与者系统中的任何其他参与者发送消息。我想限制这一点
更多细节:我正在考虑创建一个Akka集群,允许其他Akka应用加入其中。我想确保他们不能“做”比我允许他们做的更多。我不明白你所说的“我将允许定制akka应用加入何处”是什么意思。通常,akka集群中的所有
我用下面的代码不断得到未解决的依赖项。我能做些什么来清除错误
name := "AkkaDemo"
version := "1.0"
scalaVersion := "2.11.8"
val scalaTestVersion = "3.0.1"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
lazy val akkademoService = project.set
我最近开始学习Akka演员,看到演员有两种定义
class Main extends AbstractBehavior<String>
我正在学习JavaAPI,有人能解释一下它们之间的区别吗
类Main扩展了抽象行为
是Akka类型的,这是从Akka 2.6定义演员行为的方法。这是在2.5,但作为实验
类SomeActor扩展了AbstractActor
用于非类型演员,从Akka 2.6起命名为经典演员
如果您使用的是Akka 2.6,我建议您使用AbstractBehavi
我有很多线程和线程池的Java应用程序。我们可以用AKKA替换线程和线程池吗 这取决于线程正在执行的操作。他们是在阻止IO还是在他们之间使用锁和共享可变数据?如果是这样的话,akka可能不太合适,因为演员通常应该避免io或锁的阻塞。另一方面,如果线程执行独立的非阻塞工作,并且可以通过消息传递进行通信,那么akka可能是一个很好的选择。这取决于线程正在执行的操作。他们是在阻止IO还是在他们之间使用锁和共享可变数据?如果是这样的话,akka可能不太合适,因为演员通常应该避免io或锁的阻塞。另一方面,
我安装了typesafe activator(Windows)以使用akka框架。但当我试图启动激活器时,它并没有像Akka网站上提到的那样在浏览器中打开。我尝试使用命令提示符打开它,但它显示“jansi 1.11无法检索”错误。如何重新爱上它?看起来您无法下载正确版本的jansi.jar,不幸的是,下载中没有包含jansi.jar
如果您在Windows上使用web代理,则必须按照“代理背后”一节中描述的配置步骤进行操作。看起来您无法下载正确版本的jansi.jar,很遗憾,下载中未包含该版本
我让一个演员向另一个演员发送消息。它多次成功地执行此操作,但在几条消息之后,第二个参与者停止处理这些消息。系统本身的负载不是很高
再现问题的测试是:
test("case2: Primary (in isolation) should react properly to Insert, Remove, Get") {
val arbiter = TestProbe()
val primary = system.actorOf(Replica.props(arbiter.ref, Per
标签: Akka
zeromqdistributed-computing
Akka文档讨论了各种看似相互关联的Akka技术,但没有对它们进行太多区分:
阿克卡网络
阿克卡远程处理
阿克卡聚类
Akka ZeroMQ模块
我的理解是,“Akka网络”只是一个模块/库,它使Akka能够通过TCP与远程参与者系统通信。Akka远程处理是另一个模块/库(不包含在核心Akka JAR中),它使Akka能够使用八卦协议。Akka Clustering是另一个模块/库,它使用这些八卦协议允许远程参与者系统聚集在一起,并以病毒式/服务发现式的方式共享状态更改。我对Akka Zer
是否有Akka适用的特定端口范围?我以前想在2550-2570上配置它,但我不允许在生产中使用该范围。在它的位置上可以选择任何一个吗
谢谢 Akka将绑定到任何端口,只要该端口当前未在该计算机上使用。是否尝试将端口设置为零?我想零会有用的。有一个模糊的记忆在某处读到它。0只是告诉Akka选择一个随机打开的端口。对,但如何限制它使用特定的范围?
我使用Flink cluster(单机版),它在几个小时后将任务管理器与作业管理器断开连接
我遵循flink.apache.org中设置独立集群的基本步骤,使用1个主节点和2个工作节点,它们是Mac osx。
只有1名工作人员与jobManager断开连接
我共享日志文件,请给我解决方案
下面是作业经理的日志
2017-01-28 16:58:32,092 WARN org.apache.hadoop.util.NativeCodeLoader -
标签: Akka
akka-httpcircuit-breaker
akka http客户端上断路器的一个示例看起来非常简单,但对我来说,它不起作用
object HttpWithCircuitBreaker extends App {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val breaker =
new CircuitBreaker(
我有一个方法:def-Sighting(from:YearMonth):Future[Seq[Sighting]]
另一个:def目击(from:YearMonth,to:YearMonth):Source[signing,NotUsed]
我想为每个YearMonth调用第一个,从from开始,到结束,然后合并/合并结果。我似乎无法在Source上找到合适的方法来实现这一点。我现在看到的是:
val months = from.until(to, ChronoUnit.MONTHS) + 1
如何将Akka 2类型的演员的超时时间从默认的5秒更改为5秒
提到超时的TypedProps构造函数是受保护的[TypedProps](我不知道这是什么意思)。TypedProps[Foo]()。带超时(timeout)TypedProps[Foo]()。带超时(timeout)如果要更改已键入演员的默认超时,可以在akka配置中设置akka.actor.typed.timeout值。例如:
akka.actor.typed.timeout=60s
根据定义,这是“具有非void返回类型的类型
我正在创建代表物理设备及其状态的参与者。当设备上线时,我通过向参与者的路径发送和识别消息来“按需”创建它们,如果它还不存在,我就创建一个。可能会有几百万台这样的设备
我担心的是,随着演员数量的增加,身份查找将受到性能影响。这是一个合理的担忧吗
我曾考虑使用路由器策略来分割参与者,但后来我发现,在路径上使用通配符搜索路由器会从每个路由器生成ActorIdentities。我假设一个一致的HashingRouter适合这种情况,但在我进入兔子洞之前,我只想确保我没有过早地进行优化。创建参与者的实体只
我希望有几个参与者(它们表示基于Akka IO部分的TCP连接)。这些参与者应更新通用模型(内存中)。此模型保存在管理此模型的其他参与者中
我的问题是,如何设置此结构?有没有办法告诉阿克卡某个演员只有一个例子
备选方案如下:
我已经有了一个actor,它接受新的TCP/IP连接并将它们传递给新的actor。现在我可以在连接接收器中创建这个“模型管理器角色”,并将这个角色传递给新创建的每个连接角色。但在我看来,这似乎不是一个好方法,因为它将模型管理器与连接接收器联系起来
有人知道适合这种情况的解决
这里是Java/Akka v2.3.9。我的每个Akka UntypedActor子类都能够响应多个通用消息,例如ExecuteOrder66:
假设我有100个不同的actor子类,每个子类都支持ExecuteOrder66。我需要一种方法将这一信息的实例传达给我的每一位演员;就像一个公共广播,每个人都能听到
我想上面的Akka文档链接让我很接近,但我没有看到一个能向我的每个演员发送ExecuteOrder66的链接。有什么想法吗?问题是不太清楚每个人都是谁。如果某个参与者a从远程参与者系统收
我正试着将我的大脑围绕在Akka上,在阅读了2.3.11开发指南之后,我仍然对演员监控有点困惑。我主要关注两个问题:
被监视的参与者死亡或重新启动时发生的消息传递;及
首先,“同侪监督”的必要性或有用性
当我context.watch(someActorRef)时,我对当someActorRef死亡或重新启动时收到的消息感到困惑。我是否收到终止消息或死亡密码异常
对我来说,监控实际上是一种监督形式,因此只有主管/家长演员观看儿童演员才有意义,但我相信API会通过主管策略在幕后自动进行监督。因
Akka HTTP(正式名称为Spray)的一个特性是它能够自动地将json中的数据来回封送和解封送到case类中,等等。我已经成功地使其正常工作
目前,我正在尝试创建一个HTTP客户端,该客户端使用查询参数执行GET请求。当前代码如下所示:
val httpResponse: Future[HttpResponse] =
Http().singleRequest(HttpRequest(
uri = s"""http://${config.getString("http.serve
标签: Akka
reactive-programmingsprayakka-http
目前,我正在尝试在Akka HTTP中实现。我面临的问题是,这种模式在文档中的任何地方都没有记录。似乎没有办法做到以下几点:
IO(Http) ! Http.Bind(serviceActor, "localhost", port = 38080)
如何在不使用Spray的情况下为每个请求使用一个Akka actor?该类有一个可用于此目的的方法bindAndHAndleAsync。此方法接收具有以下签名的函数:
handler: (HttpRequest) ⇒ Future[HttpResp
标签: Akka
akka-clusterakka-remoting
我在Akka集群中有两个节点
我使用以下内容订阅群集的所有ClusterDomainEvent:
cluster.subscribe(
self,
InitialStateAsEvents,
classOf[ClusterDomainEvent])
当两个节点中的一个关闭时,我接收到一个不可访问的事件,并且我开始每隔几秒钟接收一些日志,这些日志警告我如下:
Association with remote system [akka.tcp://application@12
标签: Akka
sprayakka-httpspray-dsl
我有一个使用Spray Custom Directive0的路由处理程序过滤器
此自定义指令的目的是构建一个请求筛选器,以确定请求处理时间
在spray自定义指令中,我可以使用RequestContext的函数withHttpResponseMapped获取参数HttpResponse=>HttpResponse,withHttpResponseMapped将返回一个新的RequestContext对象,如下所示:
def timeRequestInterval: Directive0 = {
标签: Akka
akka-streamakka.net
我有一个演员,他接收天气状况,然后(通过使用OfferAsync)将其推送到源代码。目前,它被设置为为为接收到的每个项目运行(它将其存储到数据库)
公共类StoreConditionsActor:ReceiveActor
{
公共存储条件提供程序(ItemTemperatureDataProvider temperatureDataProvider)
{
var materializer=Context.materializer();
var source=source.Queue(10,Ove
我有一个场景,其中路由包含交替的键/值对段。对的数量未定义,收集这些对的有效方法是什么。例如,路线可能看起来像
/key1/value1/key2/value2/key3/value3
我想要一个与
(key1, value1) -> (key2, value2) -> (key3, value3)
我知道我们可以得到一个路径段列表,并按照上面的方式进行转换。但我要找的是一种能沿着这条路走下去的东西
class EntitiesSpec extends WordSpec with
有时有些库,如R2DBC,返回一个反应流,即Reactor Flux,但在Http层中,我们需要另一个反应流,即AkkaHttp知道的AkkaStreams源,并使用Http客户端创建一个流式场景,用于持续内存处理
在反应流实现之间转换的方式是什么?是否有可用的适配器库?是的,这两个库(Reactor和AkkaStreams)都可以在它们的类型(源、可流动/可观察)和Java 9中提供互操作性的发布者类型之间进行转换。例如
Source.fromPublisher(r2dbc.query().a
标签: Akka
event-sourcingakka-persistence
如果您熟悉Trello,将整个Trello板存储为参与者(使用akka持久性)是一个好的用例吗
trello板由以下部分组成:
列表
列表中的任务
每个任务都可以有注释和其他属性
在决定akka持久性是否是给定问题集的良好用例时,一般的最佳实践或考虑事项是什么
这主要取决于应用程序要执行多少写操作
Akka persistence是一种实现非常高的写入吞吐量的方法,同时确保数据的持久性,即,如果参与者死亡且内存中的数据丢失,则写日志可以持久化到磁盘
如果数据的持久性是必要的,而不需要非常高的
我已经编写了一些actor类,我发现我必须掌握这些实体的生命周期。例如,每当初始化我的actor时,我希望调用一个方法,这样我就可以在消息队列上设置一些侦听器(或打开db连接等)
有类似的吗?我能想到的等价物是Spring的InitialisingBean和DisposableBean。Actor基本上是两种方法——构造函数和onMessage(Object):void
在它的生命周期中,没有任何东西能够自然地提供“连接”行为,这就给了您一些选择
使用主管演员创建其他演员。监督员负责监视、启动和
标签: Akka
reactive-programminglagom
只是想知道从持久性事件源参与者到lagom中的读取处理器的事件通知传递的保证,对于将更新查询端的读取处理器的事件通知,是否存在任何或没有消息持久性
我知道最终会有一致性,这很好,但我说的是向Cassandra读取处理器发出的事件处理程序通知。通过在持久实体中使用事件源和在读取端处理中使用偏移跟踪来保证事件处理
当您的持久实体命令处理程序持久化事件时,每个事件都以有序的方式存储
读端处理器通过轮询数据库中偏移量大于已处理的最后一个偏移量的事件来工作。因为所有事件和每个读端处理器的最新偏移量都会保留
启动应用程序时,我偶尔会收到以下消息。当我这样做时,应用程序无法初始化
Unhandled: DeadLetter(
Register(Actor[akka://myapp/user/StreamSupervisor-0/$$7#141722842],true,false),
Actor[akka://myapp/user/StreamSupervisor-0/$$7#141722842],
Actor[akka://myapp/system/IO-TCP/selectors/$a/
标签: Akka
akka-streamakka-httpalpakka
我很清楚,使用Actors当然是可能的:例如,使用AkkaHttp和类型化Actors
但我不清楚是否只使用AkkaStreams及其Alpakka connectors库(包括数据库),是否有可能执行常规CRUD/OLTP服务,或者只是从一个数据库到另一个数据库的数据复制,或者其他OLAP/批处理/流处理场景
如果你知道如何做到这一点,请指出一些细节,如果你能在github上提供一个例子,那就太好了
我认为可能的方式是,服务器涉及两个对话/有状态流转换:一个通过HTTP与外部世界进行对话,另一
1 2 3 4 5 6 ...
下一页 最后一页 共 17 页