【分布式计算】java消息队列机制

        消息队列是一种在不同组件或应用之间进行数据传递的技术,通常用于处理异步通信。它允许消息的发送者(生产者)和接收者(消费者)之间进行解耦。


概念


        消息队列是一种先进先出(FIFO)的数据结构,它存储待处理的消息直到它们被消费。消息是生产者发送给队列的数据单元,消费者则从队列中读取这些消息进行处理。


原理


1. 生产者:
   - 生产者是创建消息的实体,它负责将消息发送到队列。生产者不需要关心消息的具体处理过程,只需确保消息正确发送到队列。

2. 消息队列:
   - 消息队列充当缓冲区,暂时存储从生产者那里发送过来的消息。队列管理消息的顺序,并确保按照发送的顺序逐一传递给消费者。

3. 消费者:
   - 消费者从消息队列中读取消息,并进行相应的处理。消费者可以是同一应用的其他部分,或者是完全独立的应用。

4. 消息处理:
   - 一旦消息被消费者读取,它可以被确认和删除,或者在处理失败时重新放回队列等待再次处理。


使用场景


异步处理:当应用执行耗时任务时,可以将任务封装成消息发送到队列,由消费者异步处理。
流量控制:在高流量事件如大促销或黑色星期五时,消息队列可以帮助缓冲入站流量,防止系统过载。
解耦服务:在微服务架构中,消息队列可以帮助减少服务之间的直接依赖,通过消息传递来通信,从而提高系统的可维护性和扩展性。


Java消息队列技术

在Java中,消息队列是一种数据结构或服务,用于在不同的应用组件或系统之间异步传递消息。它支持松耦合的架构,允许发送者和接收者独立地进行开发和扩展。消息队列可以帮助缓解高负载、增强系统的可伸缩性,并提供容错机制。下面是一些常见的Java消息队列技术:

1. Apache Kafka:
        Kafka是一个分布式流处理平台,它不仅能够处理消息队列的功能,还能处理复杂的事件流。它特别适合需要高吞吐量和可靠性的大规模数据处理场景。

2. RabbitMQ:
        RabbitMQ是一个开源消息代理,支持多种消息协议。它提供灵活的路由功能,能够保证消息的可靠传输。适合于复杂的消息传递需求和多种不同的通信模式。

3. ActiveMQ:
        Apache ActiveMQ是一个强大的开源消息代理,支持多种JMS(Java Message Service)协议和客户端语言。适用于那些需要JMS标准支持的企业应用。

4. Amazon SQS (Simple Queue Service):
        SQS是一个托管的消息队列服务,提供简单的Web服务API来完全管理队列的消息传输。它能够无限扩展,并且不需要预先安装消息队列基础设施。

5. Google Cloud Pub/Sub:
        Google的Pub/Sub提供了一种全球分布式的消息传递平台,适合处理大量数据的实时交换。


这个流程图展示了使用 ActiveMQ 实现消息队列的基本步骤,包括消息的发送和接收。以下是每个步骤的详细讲解。

 1. 创建 ConnectionFactory

`ConnectionFactory` 是一个接口,用于创建连接到消息中间件(ActiveMQ)的工厂。它是创建连接的起点。

ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");


2. 使用 ConnectionFactory 创建 Connection

通过 `ConnectionFactory` 创建一个连接对象 `Connection`。

Connection connection = connectionFactory.createConnection();

3. 启动 Connection

在使用连接之前,必须启动它。

connection.start();

4. 使用 Connection 创建一个或多个 JMS Session

通过 `Connection` 创建会话 `Session`。会话是生产和消费消息的上下文。

Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


5. 使用 Session 创建 Queue 或 Topic

通过会话创建队列(Queue)或主题(Topic)。队列用于点对点消息传递,主题用于发布/订阅消息传递。

Queue queue = session.createQueue("testQueue");
// 或者
Topic topic = session.createTopic("testTopic");

6. 使用 Session 创建 MessageProducer 或 MessageConsumer

根据需要创建消息生产者 `MessageProducer` 或消息消费者 `MessageConsumer`。

创建 MessageProducer

MessageProducer producer = session.createProducer(queue);

创建 MessageConsumer

MessageConsumer consumer = session.createConsumer(queue);

7. 发送消息

使用 `MessageProducer` 发送消息。

TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
producer.send(message);

8. 接收消息

异步接收

设置消息监听器,当有消息到达时自动触发。

consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
        if (message instanceof TextMessage) {
            try {
                String text = ((TextMessage) message).getText();
                System.out.println("Received: " + text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
});

同步接收

使用 `MessageConsumer.receive()` 方法同步接收消息。

Message message = consumer.receive();
if (message instanceof TextMessage) {
    String text = ((TextMessage) message).getText();
    System.out.println("Received: " + text);
}

完整代码示例

生产者代码

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Producer {
    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 创建连接
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列
        Queue queue = session.createQueue("testQueue");

        // 创建生产者
        MessageProducer producer = session.createProducer(queue);
        // 创建消息
        TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
        // 发送消息
        producer.send(message);

        // 关闭连接
        connection.close();
    }
}

消费者代码

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class Consumer {
    public static void main(String[] args) throws JMSException {
        // 创建连接工厂
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        // 创建连接
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 创建队列
        Queue queue = session.createQueue("testQueue");

        // 创建消费者
        MessageConsumer consumer = session.createConsumer(queue);

        // 同步接收消息
        Message message = consumer.receive();
        if (message instanceof TextMessage) {
            String text = ((TextMessage) message).getText();
            System.out.println("Received: " + text);
        }

        // 异步接收消息
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        String text = ((TextMessage) message).getText();
                        System.out.println("Received: " + text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        // 为了测试异步接收,保持程序运行一段时间
        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 关闭连接
        connection.close();
    }
}

具体应用

调用百度云api

使用消息队列实现一个调用百度智能云 API 的校园卡程序有助于提高系统的可扩展性和可靠性。消息队列可以解耦生产者和消费者,并实现异步处理。

实现步骤

1. 配置消息队列:
    - 安装并配置 RabbitMQ 或 ActiveMQ。
    - 配置 Spring Boot 项目以连接到消息队列。

2. 创建生产者(Producer):
    - 接收用户上传的图片。
    - 将图片编码为 Base64 格式,并发送到消息队列。

3. 创建消费者(Consumer):
    - 监听消息队列中的消息。
    - 调用百度智能云 API 进行图片识别。
    - 将识别结果存储以便后续查询。

4. 实现控制器(Controller):
    - 提供上传图片的接口。
    - 提供获取识别结果的接口。

系统架构图

+-------------------+        +--------------------+        +-------------------+
|                   |        |                    |        |                   |
|   User Uploads    |        |   Message Queue    |        |    API Consumer   |
|   (Controller)    | -----> |  (RabbitMQ/ActiveMQ)| -----> | (Baidu API Call)  |
|                   |        |                    |        |                   |
+-------------------+        +--------------------+        +-------------------+

具体实现

  • 用户上传图片:用户通过前端页面上传图片,图片通过 RecognitionController 接收并保存到消息队列。
  • 消息队列:图片以消息的形式存储在消息队列中,保证消息的可靠传递。
  • 消息处理RecognitionListener 监听消息队列,当有新消息到达时,调用百度智能云 API 进行图片识别,并将结果保存到 RecognitionService
  • 获取结果:用户可以通过访问 /api/resultPage 来获取最新的识别结果。

代码部分:

1. 配置消息队列:

   配置 `application.properties` 以连接到消息队列(以 ActiveMQ 为例):

   spring.activemq.broker-url=tcp://localhost:61616
   spring.activemq.user=admin
   spring.activemq.password=admin
   spring.jms.pool.enabled=true

2. 创建生产者(Producer):

  package com.example.mq;

   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.jms.core.JmsTemplate;
   import org.springframework.stereotype.Service;

   @Service
   public class RecognitionService {

       @Autowired
       private JmsTemplate jmsTemplate;

       public void sendImageForRecognition(byte[] imageBytes) {
           jmsTemplate.convertAndSend("animal.recognition.queue", imageBytes);
       }

       // 其他方法...
   }

3. 创建消费者(Consumer):
 

  package com.example.mq;

   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.jms.annotation.JmsListener;
   import org.springframework.stereotype.Component;

   @Component
   public class RecognitionListener {

       @Autowired
       private RecognitionService recognitionService;

       @JmsListener(destination = "animal.recognition.queue")
       public void processImage(byte[] imageBytes) {
           String result = recognitionService.recognizeImage(imageBytes);
           recognitionService.saveResult(result);
       }
   }

4. 实现控制器(Controller):

 package com.example.mq;

   import org.springframework.beans.factory.annotation.Autowired;
   import org.springframework.stereotype.Controller;
   import org.springframework.ui.Model;
   import org.springframework.web.bind.annotation.*;
   import org.springframework.web.multipart.MultipartFile;
   import org.springframework.web.servlet.view.RedirectView;

   import java.io.IOException;

   @Controller
   @RequestMapping("/api")
   public class RecognitionController {

       @Autowired
       private RecognitionService recognitionService;

       @PostMapping("/recognize")
       public RedirectView recognizeAnimal(@RequestParam("file") MultipartFile file, Model model) throws IOException {
           if (file.isEmpty()) {
               model.addAttribute("message", "File is empty");
               return new RedirectView("/errorPage.html");
           }

           byte[] bytes = file.getBytes();
           recognitionService.saveResult("等待识别结果...");
           recognitionService.sendImageForRecognition(bytes);

           return new RedirectView("/resultPage.html");
       }

       @GetMapping("/resultPage")
       @ResponseBody
       public String getResult() {
           return recognitionService.getResult();
       }
   }
 

消息队列的意义

异步处理:允许用户在上传图片后立即获得响应,而不是等待图片识别结果。
解耦:上传图片的部分和图片识别的部分可以独立开发和扩展。
负载均衡:可以轻松增加更多的消费者以处理高并发请求。
可靠性:消息队列持久化消息,确保即使在系统故障时也不会丢失消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/714877.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

(三十九)Vue之集中式的状态管理机制Vuex

目录 概念vuex的核心概念State(状态)Getters(获取器)Mutations(突变)Actions(动作) 搭建vuex环境基本使用getters的使用 上一篇:(三十八)Vue之插槽…

02 设计过程概述

02 设计过程概述 2-1 设计需求2-2 飞机设计的各个阶段2-2-1 概念设计2-2-2 初步设计2-2-3 详细设计 2-3 飞机概念设计的流程2-4 集成产品开发和飞机设计2-5 补充2-5-1 布局设计(Configuration Design)关键任务:作用和重要性:使用领…

SinoDB导入导出工具汇总

在进行数据迁移、数据库表备份、表重建以及批量数据加载时,我们经常希望数据处理过程能够更快点。本文是SinoDB导入导出工具的汇总,大家可以根据不同场景选择合适的SinoDB导入导出工具。 1. 各工具特点 通常利用dbschema工具导出数据库结构,…

父亲节 | 10位名家笔下的父亲,读懂那份孤独而深沉的父爱

Fathers Day 母爱如水,父爱如山。 相对于母爱的温柔,父亲的爱多了几分静默和深沉。 读完10位名家笔下的父亲,我们就会明白,到底亏欠了父亲多少。 不要让自己有“子欲养而亲不待”的后悔和遗憾, 多给父亲一些爱的表示&a…

《Cloud Native Data Center Networking》(云原生数据中心网络设计)读书笔记 -- 01 为什么需要一个新的网络架构

关于专栏 本专栏是工作之后阅读 Cloud Native Data Center Networking ( O’Reilly, 2019)的读书笔记。这本书是我在数据中心从事云网络工作的启蒙、扫盲读物。可惜,其中文版翻译并非尽善尽美,必须结合英文原版才能理解原作者要表…

期末算法复习

0-1背包问题(动态规划) 例题 算法思想: 动态规划的核心思想是将原问题拆分成若干个子问题,并利用已解决的子问题的解来求解更大规模的问题。 主要是状态转移方程和状态 算法描述: 初始化一个二维数组dp&#xff0…

通过命令行启动MySQL

通过命令行启动MySQL 右击,选择管理员运行 停止MySQL net stop你的服务名称 net stop MySQL启动MySQL net start你的服务名称 net start MySQL

绿色版DirectoryOpus功能强大且高度可定制的Windows文件管理器

Directory Opus(通常简称为DOpus)是一款功能强大且高度可定制的Windows文件管理器。它提供了许多超越Windows默认文件资源管理器(Explorer)的功能,使得文件和文件夹的管理变得更加高效和直观。以下是对Directory Opus的…

破解动态网页:如何用JavaScript获取自动消失的联想词

前几天在做数据分析时,我尝试获取某网站上输入搜索词后的联想词,输入搜索词后会弹出一个显示联想词的框。有趣的是,当我尝试通过按F12定位这个弹框在HTML中的位置时,输入框失去焦点后,联想词弹框就自动消失了。我观察到…

mySql的事务(操作一下)

目录 1. 简介2. 事务操作3. 四大特性4. 并发事务问题5. 脏读6. 不可重复读7. 幻读事务隔离级别参考链接 1. 简介 事务是一组操作的集合,它是一个不可分割的工作单位,事务会把所有的操作作为一个整体一起向系统提交或撤销操作请求,即这些操作…

华为od-C卷200分题目2 - 找城市

华为od-C卷200分题目2 - 找城市 题目描述 一个城市规划问题,一个地图有很多城市,两个城市之间只有一种路径,切断通往一 个城市i的所有路径之后,其他的城市形成了独立的城市群,这些城市群里最大的城 市数量&#xff0…

【Python】深入了解 AdaBoost:自适应提升算法

我们都找到天使了 说好了 心事不能偷藏着 什么都 一起做 幸福得 没话说 把坏脾气变成了好沟通 我们都找到天使了 约好了 负责对方的快乐 阳光下 的山坡 你素描 的以后 怎么抄袭我脑袋 想的 🎵 薛凯琪《找到天使了》 在机器学习的领域中&#x…

[C++ STL] vector 详解

标题:[C STL] vector 详解 水墨不写bug 目录 一、背景 二、vector简介 三、vector的接口介绍 (1)默认成员函数接口 i,构造函数(constructor) ii,析构函数(destructor&#xff0…

微信小程序地图

小程序中的 Map 地图功能详解 一&#xff0c;使用&#xff1a; 要在小程序中使用地图&#xff0c;首先需要在 WXML 文件中引入 Map 组件&#xff1a; <view class"container"><map id"myMap" longitude"{{longitude}}" latitude&quo…

开源语音合成模型ChatTTS本地部署结合内网穿透实现远程访问

文章目录 前言1. 下载运行ChatTTS模型2. 安装Cpolar工具3. 实现公网访问4. 配置ChatTTS固定公网地址 前言 本篇文章就教大家如何快速地在Windows中本地部署ChatTTS&#xff0c;并且我们还可以结合Cpolar内网穿透实现公网随时随地访问ChatTTS AI语言模型。 最像人声的AI来了&a…

超拟人大模型:AI心理健康服务的未来

摘要&#xff1a; 周末听了一场聆心智能关于情感LLM的分享&#xff0c;总结了相关内容如下。在人工智能技术的浪潮中&#xff0c;超拟人大模型技术为心理健康服务领域带来了革命性的变化。本文将分析超拟人大模型的进展、CharacterGLM模型的特点、Emohaa模型的应用以及心理健康…

42 mysql “+“ 操作符的实现

前言 问题来自于 chinaunix, mysql select 子句的小白问题 mysql 的一些基础的 算术运算符 的计算的实现 这里 整理如下 case, 执行之前 设置如下变量 set a 2; set b 3;select a b; select a b; select 1 3; select 1 3; select a b; select a b; select a b; …

实战项目: 负载均衡

0. 前言 这个项目使用了前后端,实现一个丐版的LeetCode刷题网站,并根据每台主机的实际情况,选择对应的主机,负载均衡的调度 0.1 所用技术与开发环境 所用技术: C STL 标准库 Boost 准标准库 ( 字符串切割 ) cpp- httplib 第三方开源网络库 ctemplate 第三方开源前端网…

EarMaster7.5.74官方版安装激活使用教程

EarMaster就是你音乐路上的良师益友。这是一款来自丹麦皇家音乐学院的多媒体音乐教育软件&#xff0c;针对视唱练耳为音乐学生&#xff0c;音乐爱好者以及音乐专业人员都带来了很多的帮助&#xff0c;让你们可以获得音乐家般的耳朵&#xff0c;通过专业视唱练耳培训考试&#x…

使用 PNPM 从零搭建 Monorepo,测试组件并发布

1 目标 通过 PNPM 创建一个 monorepo&#xff08;多个项目在一个代码仓库&#xff09;项目&#xff0c;形成一个通用的仓库模板。 这里以在该 monorepo 项目中搭建 web components 类型的组件库为例&#xff0c;介绍从仓库搭建、组件测试到组件发布的整个流程。 这个仓库既可…