Akka、SQS 和 Camel 的消费者投票率

本文介绍了Akka、SQS 和 Camel 的消费者投票率的处理方法,对大家解决问题具有一定的参考价值

问题描述

我正在进行的一个项目需要从 SQS 读取消息,因此我决定使用 Akka 来分发这些消息的处理.

A project I'm working on requires the reading of messages from SQS, and I decided to use Akka to distribute the processing of these messages.

由于 Camel 支持 SQS,并且在 Consumer 类中内置了供 Akka 使用的功能,我认为最好以这种方式实现端点并读取消息,尽管我没有看到很多人的例子这样做.

Since SQS is support by Camel, and there is functionality built in for use in Akka in the Consumer class, I imagined it would be best to implement the endpoint and read messages this way, though I had not seen many examples of people doing so.

我的问题是我无法足够快地轮询队列以保持队列为空或接近空.我最初的想法是,我可以让消费者以 X/s 的速率通过 Camel 从 SQS 接收消息.从那里,我可以简单地创建更多的消费者来达到我需要处理消息的速度.

My problem is that I cannot poll my queue quickly enough to keep my queue empty, or near empty. What I originally thought was that I could get a Consumer to receive messages over Camel from SQS at a rate of X/s. From there, I could simply create more Consumers to get up to the rate at which I needed messages processed.

我的消费者:

import akka.camel.{CamelMessage, Consumer}
import akka.actor.{ActorRef, ActorPath}

class MyConsumer() extends Consumer {
  def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)"
  var count = 0

  def receive = {
    case msg: CamelMessage => {
      count += 1
    }
    case _ => {
      println("Got something else")
    }
  }

  override def postStop(){
    println("Count for actor: " + count)
  }
}

如图所示,我设置了 delay=1 以及 &maxMessagesPerPoll=10 来提高消息的速率,但我无法生成多个具有相同端点的消费者.

As shown, I've set delay=1 as well as &maxMessagesPerPoll=10 to improve the rate of messages, but I'm unable to spawn multiple consumers with the same endpoint.

我在文档中读到 默认情况下假定端点不支持多个消费者. 我相信这也适用于 SQS 端点,因为产生多个消费者只会给我一个消费者运行系统一分钟后,输出消息是 Count for actor: x 而不是其他输出 Count for actor: 0.

I read in the docs that By default endpoints are assumed not to support multiple consumers. and I believe this holds true for SQS endpoints as well, as spawning multiple consumers will give me only one consumer where after running the system for a minute, the output message is Count for actor: x instead of the others which output Count for actor: 0.

如果这有用的话;在单个消费者上使用此当前实现,我每秒可以读取大约 33 条消息.

If this is at all useful; I'm able to read approximately 33 messages/second with this current implementation on the single consumer.

这是从 Akka 的 SQS 队列中读取消息的正确方法吗?如果是这样,我有没有办法让它向外扩展,以便我可以将消息消耗率提高到接近 900 条消息/秒?

Is this the proper way to be reading messages from an SQS queue in Akka? If so, is there way I can get this to scale outward so that I can increase my rate of message consumption closer to that of 900 messages/second?

推荐答案

遗憾的是 Camel 目前不支持在 SQS 上并行消费消息.

Sadly Camel does not currently support parallel consumption of messages on SQS.

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

为了解决这个问题,我编写了自己的 Actor 来使用 aws-java-sdk 轮询批处理消息 SQS.

To address this I've written my own Actor to poll batch messages SQS using the aws-java-sdk.

  def receive = {
    case BeginPolling => {
      // re-queue sending asynchronously
      self ! BeginPolling
      // traverse the response
      val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry]
      val messages = sqs.receiveMessage(receiveMessageRequest).getMessages
      messages.toList.foreach {
        node => {
          deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle))
          //log.info("Node body: {}", node.getBody)
          filterSupervisor ! node.getBody
        }
      }
      if(deleteEntryList.size() > 0){
        val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList)
        sqs.deleteMessageBatch(deleteMessageBatchRequest)
      }
    }

    case _ => {
      log.warning("Unknown message")
    }
  }

虽然我不确定这是否是最好的实现,当然可以对其进行改进,以使请求不会经常遇到空队列,但它确实适合我目前能够从同一队列轮询消息的需求排队.

Though I'm not certain if this is the best implementation, and it could of course be improved upon so that requests are not constantly hitting an empty queue, it does suit my current needs of being able to poll messages from the same queue.

使用这个从 SQS 获取大约 133(消息/秒)/actor.

Getting about 133 (messages/second)/actor from SQS with this.

这篇关于Akka、SQS 和 Camel 的消费者投票率的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,WP2

admin_action_{$_REQUEST[‘action’]}

do_action( "admin_action_{$_REQUEST[‘action’]}" )动作钩子::在发送“Action”请求变量时激发。Action Hook: Fires when an ‘action’ request variable is sent.目录锚点:#说明#源码说明(Description)钩子名称的动态部分$_REQUEST['action']引用从GET或POST请求派生的操作。源码(Source)更新版本源码位置使用被使用2.6.0 wp-admin/admin.php:...

日期:2020-09-02 17:44:16 浏览:1168

admin_footer-{$GLOBALS[‘hook_suffix’]}

do_action( "admin_footer-{$GLOBALS[‘hook_suffix’]}", string $hook_suffix )操作挂钩:在默认页脚脚本之后打印脚本或数据。Action Hook: Print scripts or data after the default footer scripts.目录锚点:#说明#参数#源码说明(Description)钩子名的动态部分,$GLOBALS['hook_suffix']引用当前页的全局钩子后缀。参数(Parameters)参数类...

日期:2020-09-02 17:44:20 浏览:1069

customize_save_{$this->id_data[‘base’]}

do_action( "customize_save_{$this->id_data[‘base’]}", WP_Customize_Setting $this )动作钩子::在调用WP_Customize_Setting::save()方法时激发。Action Hook: Fires when the WP_Customize_Setting::save() method is called.目录锚点:#说明#参数#源码说明(Description)钩子名称的动态部分,$this->id_data...

日期:2020-08-15 15:47:24 浏览:806

customize_value_{$this->id_data[‘base’]}

apply_filters( "customize_value_{$this->id_data[‘base’]}", mixed $default )过滤器::过滤未作为主题模式或选项处理的自定义设置值。Filter Hook: Filter a Customize setting value not handled as a theme_mod or option.目录锚点:#说明#参数#源码说明(Description)钩子名称的动态部分,$this->id_date['base'],指的是设置...

日期:2020-08-15 15:47:24 浏览:898

get_comment_author_url

过滤钩子:过滤评论作者的URL。Filter Hook: Filters the comment author’s URL.目录锚点:#源码源码(Source)更新版本源码位置使用被使用 wp-includes/comment-template.php:32610...

日期:2020-08-10 23:06:14 浏览:930

network_admin_edit_{$_GET[‘action’]}

do_action( "network_admin_edit_{$_GET[‘action’]}" )操作挂钩:启动请求的处理程序操作。Action Hook: Fires the requested handler action.目录锚点:#说明#源码说明(Description)钩子名称的动态部分$u GET['action']引用请求的操作的名称。源码(Source)更新版本源码位置使用被使用3.1.0 wp-admin/network/edit.php:3600...

日期:2020-08-02 09:56:09 浏览:876

network_sites_updated_message_{$_GET[‘updated’]}

apply_filters( "network_sites_updated_message_{$_GET[‘updated’]}", string $msg )筛选器挂钩:在网络管理中筛选特定的非默认站点更新消息。Filter Hook: Filters a specific, non-default site-updated message in the Network admin.目录锚点:#说明#参数#源码说明(Description)钩子名称的动态部分$_GET['updated']引用了非默认的...

日期:2020-08-02 09:56:03 浏览:863

pre_wp_is_site_initialized

过滤器::过滤在访问数据库之前是否初始化站点的检查。Filter Hook: Filters the check for whether a site is initialized before the database is accessed.目录锚点:#源码源码(Source)更新版本源码位置使用被使用 wp-includes/ms-site.php:93910...

日期:2020-07-29 10:15:38 浏览:833

WordPress 的SEO 教学:如何在网站中加入关键字(Meta Keywords)与Meta 描述(Meta Description)?

你想在WordPress 中添加关键字和meta 描述吗?关键字和meta 描述使你能够提高网站的SEO。在本文中,我们将向你展示如何在WordPress 中正确添加关键字和meta 描述。为什么要在WordPress 中添加关键字和Meta 描述?关键字和说明让搜寻引擎更了解您的帖子和页面的内容。关键词是人们寻找您发布的内容时,可能会搜索的重要词语或片语。而Meta Description则是对你的页面和文章的简要描述。如果你想要了解更多关于中继标签的资讯,可以参考Google的说明。Meta 关键字和描...

日期:2020-10-03 21:18:25 浏览:1720

谷歌的SEO是什么

SEO (Search Engine Optimization)中文是搜寻引擎最佳化,意思近于「关键字自然排序」、「网站排名优化」。简言之,SEO是以搜索引擎(如Google、Bing)为曝光媒体的行销手法。例如搜寻「wordpress教学」,会看到本站的「WordPress教学:12个课程…」排行Google第一:关键字:wordpress教学、wordpress课程…若搜寻「网站架设」,则会看到另一个网页排名第1:关键字:网站架设、架站…以上两个网页,每月从搜寻引擎导入自然流量,达2万4千:每月「有机搜...

日期:2020-10-30 17:23:57 浏览:1308