微技术

关注技术领域,分享资讯和经验。

springboot 集成NATS

NATS是一种go语言开发的开源的、轻量、高性能的云原生消息系统,负责处理、发现和交换分布式系统中的消息; nats消息由主题处理,不依赖于网络位置。

案例

1、application-nats.yml

spring:
  nats:
    natsUrls:
      - nats://192.168.2.246:4222

2、java junit测试


import com.alibaba.fastjson.JSON;
import io.nats.client.Connection;
import io.nats.client.Message;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import java.nio.charset.StandardCharsets;
import java.time.Duration;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableNats
public class ConsoleApiTest {

    @Autowired
    private Connection connection;

    @Subscribe("data")
    public void sub(Message message) {
        log.info("<<<=== subject={}, data={}", message.getSubject(), new String(message.getData()));
    }

    @Test
    public void pub() {
        log.info(connection.getConnectedUrl());
        connection.publish("data", "data测试数据".getBytes(StandardCharsets.UTF_8));
        try {
            connection.flush(Duration.ZERO);
        } catch (Exception e) {
            e.printStackTrace();
        }
        try {
            Message data = connection.request("data", "data request测试数据".getBytes(StandardCharsets.UTF_8), Duration.ofSeconds(1));
            log.info("subject={}, data={}, replyTo={}", data.getSubject(), new String(data.getData()), data.getReplyTo());
            log.info(JSON.toJSONString(data));
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
//        while (true);
    }
}

3、测试结果:

17:19:00.501 [main] INFO  c.m.c.c.a.ConsoleApiTest - [message,34] - nats://192.168.2.246:4222
17:19:00.524 [nats:3] INFO  c.m.c.c.a.ConsoleApiTest - [message,29] - <<<===subject=data, data=data测试数据
17:19:00.528 [nats:3] INFO  c.m.c.c.a.ConsoleApiTest - [message,29] - <<<===subject=data, data=data request测试数据

完成。

go(reply)、java(request)

如果要看到java request返回的内容,开启上一文go nats安装使用 四、请求-回复go测试代码 中的 reply.go测试。
reply.go

package main

import (
    "encoding/json"
    "github.com/nats-io/nats.go"
    "log"
    "runtime"
)

func main() {
    nc, err := nats.Connect("192.168.2.246:4222")
    if err != nil {
        log.Fatal("connect error")
    }
    nc.Subscribe("data", func(m *nats.Msg) {
        result, _ := json.Marshal(m)
        log.Println("<<<=== ", string(m.Data), "\r\nm=", string(result))
        //time.Sleep(5 * time.Second)
        pubstr := "reply.go Publish content."
        log.Println("===>>> ", pubstr, ", replyTo=", m.Reply)
        nc.Publish(m.Reply, []byte(pubstr))
    })
    runtime.Goexit()
}

运行:


go run reply.go