用一个过期的 JWT 订阅,Client 永远无法收到消息

版本

5.6.0

场景

我生成一个2秒之后过期的 JWT,立刻调用 connect,然后等待 JWT 过期之后,再调用 subscribe

预期

Client 能得到一个 JWT 过期的异常,或者 EMQX 可以主动断开连接

实际情况

EMQX Server会打印出如下日志

 [warning] tag: AUTHZ, clientid: AsyncHiveMqtt5SubscriberTest, msg: authorization_permission_denied, peername: 172.24.0.1:45608, username: AsyncHiveMqtt5Subscriber, topic: testtopic/5, action: SUBSCRIBE(Q1), source: jwt

客户端没有任何Exception

从 EMQX 监控页面看到,连接一直保持者,该 client 订阅列表为空

测试代码:

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.security.Keys;

import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class AsyncHiveMqtt5SubscriberAfterTokenExpiry {

    public static String generateWillExpiredJwt(String username) {
        String secret = "emqxsecretemqxsecretemqxsecretemqxsecret";
        Key key = Keys.hmacShaKeyFor(secret.getBytes());
        String jwt = Jwts.builder()
                .claim("username", username)
                .claim("acl",
                        List.of(
                                Map.of("permission", "allow", "action", "publish", "topic", "testtopic/4"),
                                Map.of("permission", "allow", "action", "publish", "topic", "testtopic/5"),
                                Map.of("permission", "allow", "action", "subscribe", "topic", "testtopic/4"),
                                Map.of("permission", "allow", "action", "subscribe", "topic", "testtopic/5")
                        )
                )
                .signWith(key, SignatureAlgorithm.HS256)
                .setExpiration(new Date(System.currentTimeMillis() + 2000))
                .compact();
        System.out.println("Generated JWT: " + jwt);
        return jwt;
    }

    public static void main(String[] args) {
        String subTopic = "testtopic/5";
        MqttQos qos = MqttQos.AT_LEAST_ONCE;
        String brokerUrl = "localhost";
        String clientId = "AsyncHiveMqtt5SubscriberTest";
        String username = "AsyncHiveMqtt5Subscriber";
        // 生成一个两秒后过期的 jwt
        String password = generateWillExpiredJwt(username);

        final Mqtt5BlockingClient client = Mqtt5Client.builder()
                .identifier(clientId)
                .simpleAuth()
                .username(username)
                .password(password.getBytes(StandardCharsets.UTF_8))
                .applySimpleAuth()
                .serverHost(brokerUrl)
                .automaticReconnect()
                .initialDelay(1, TimeUnit.SECONDS)
                .maxDelay(10, TimeUnit.SECONDS)
                .applyAutomaticReconnect()
                .buildBlocking();

        client.connect();
        System.out.println("Connected to " + brokerUrl);

        try {
            Thread.sleep(3000);
            client.toAsync().subscribeWith()
                    .topicFilter(subTopic)
                    .qos(qos)
                    .callback(mqtt5Publish -> System.out.println("Received message: " + new String(mqtt5Publish.getPayloadAsBytes())))
                    .send();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

EMQX 中,「客户端认证」 以及 「客户端授权」 是怎样配置的呢?


生成一个2秒之后过期的 JWT,立刻调用 connect,然后等待 JWT 过期之后,再调用 subscribe

是使用了 JWT 认证么?
AuthZ 中没有提供使用 JWT 验证权限的功能。

开源版是有 JWT 权限验证的

这是认证,不是授权。
显然,在2s 的有效期内连接,有效的 jwt 是能够通过认证的。也即客户端能够正常连接。
这之后的 subscribe 就是「授权」所关心的事情了。

认证与授权是两个事情。
https://www.emqx.io/docs/zh/latest/access-control/authn/authn.html
https://www.emqx.io/docs/zh/latest/access-control/authz/authz.html

谢谢,我明白了,我将 authz 的 deny_action 设置为 disconnect。达到了我想要的效果。

但是我在没有设置这个选项之前,使用过期的 token publish 一条消息会收到一个 ReasonCode 135。

在默认情况下,使用过期的 token 做 publish 和 subscribe 得到的结果不一致,这个很奇怪

Reason Code 135 是 not_authorized,PUB/SUB 操作未授权。

你的客户端鉴权(AuthZ)部分是怎么配的?

GET http://localhost:18083/api/v5/authorization/settings

{
  "cache": {
    "enable": true,
    "excludes": [],
    "max_size": 32,
    "ttl": "1m"
  },
  "deny_action": "ignore",
  "no_match": "deny"
}

使用这个配置时,用过期的 JWT publish 一条消息,收到了 135

GET http://[HOST]:18083/api/v5/authorization/sources

看一下 authz 数据源的配置
有某个 authz 数据源拒绝了 publish 消息

GET http://[HOST]:18083/api/v5/authorization/sources

{
  "sources": [
    {
      "enable": true,
      "rules": "%%--------------------------------------------------------------------\n%% -type(ipaddr() :: {ipaddr, string()}).\n%%\n%% -type(ipaddrs() :: {ipaddrs, [string()]}).\n%%\n%% -type(username() :: {user | username, string()} | {user | username, {re, regex()}}).\n%%\n%% -type(clientid() :: {client | clientid, string()} | {client | clientid, {re, regex()}}).\n%%\n%% -type(who() :: ipaddr() | ipaddrs() | username() | clientid() |\n%%                {'and', [ipaddr() | ipaddrs() | username() | clientid()]} |\n%%                {'or',  [ipaddr() | ipaddrs() | username() | clientid()]} |\n%%                all).\n%%\n%% -type(action() :: subscribe | publish | all).\n%%\n%% -type(topic_filters() :: string()).\n%%\n%% -type(topics() :: [topic_filters() | {eq, topic_filters()}]).\n%%\n%% -type(permission() :: allow | deny).\n%%\n%% -type(rule() :: {permission(), who(), action(), topics()} | {permission(), all}).\n%%--------------------------------------------------------------------\n\n{allow, {username, {re, \"^dashboard$\"}}, subscribe, [\"$SYS/#\"]}.\n\n{allow, {ipaddr, \"127.0.0.1\"}, all, [\"$SYS/#\", \"#\"]}.\n\n{deny, all, subscribe, [\"$SYS/#\", {eq, \"#\"}]}.\n\n{allow, all}.\n",
      "type": "file"
    }
  ]
}

在 web 页面查看 File 授权内容如下

%%--------------------------------------------------------------------
%% -type(ipaddr() :: {ipaddr, string()}).
%%
%% -type(ipaddrs() :: {ipaddrs, [string()]}).
%%
%% -type(username() :: {user | username, string()} | {user | username, {re, regex()}}).
%%
%% -type(clientid() :: {client | clientid, string()} | {client | clientid, {re, regex()}}).
%%
%% -type(who() :: ipaddr() | ipaddrs() | username() | clientid() |
%%                {'and', [ipaddr() | ipaddrs() | username() | clientid()]} |
%%                {'or',  [ipaddr() | ipaddrs() | username() | clientid()]} |
%%                all).
%%
%% -type(action() :: subscribe | publish | all).
%%
%% -type(topic_filters() :: string()).
%%
%% -type(topics() :: [topic_filters() | {eq, topic_filters()}]).
%%
%% -type(permission() :: allow | deny).
%%
%% -type(rule() :: {permission(), who(), action(), topics()} | {permission(), all}).
%%--------------------------------------------------------------------

{allow, {username, {re, "^dashboard$"}}, subscribe, ["$SYS/#"]}.

{allow, {ipaddr, "127.0.0.1"}, all, ["$SYS/#", "#"]}.

{deny, all, subscribe, ["$SYS/#", {eq, "#"}]}.

{allow, all}.

我在关闭了 file authz 之后,执行这个 case,依旧返回 135

import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5PubAckException;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5PublishResult;
import com.hivemq.client.mqtt.mqtt5.message.publish.puback.Mqtt5PubAck;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.security.Keys;
import lombok.extern.slf4j.Slf4j;

import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Slf4j
public class HiveMqtt5PublisherTokenExpiry2 {

    public static String generateWillExpiredJwt(String username) {

        String secret = "emqxsecretemqxsecretemqxsecretemqxsecret";

        // 使用密钥生成 Key 对象
        Key key = Keys.hmacShaKeyFor(secret.getBytes());
        // 构建JWT
        String jwt = Jwts.builder()
                .claim("username", username) // 自定义声明
                .claim("acl",
                        Map.of("sub", List.of("testtopic/4", "testtopic/5"),
                                "pub", List.of("testtopic/4", "testtopic/5"))
                )
                .signWith(key, SignatureAlgorithm.HS256)
                .setExpiration(new Date(System.currentTimeMillis() + 2000))
                .compact();
        System.out.println("Generated JWT: " + jwt);
        return jwt;
    }

    public static void main(String[] args) {
        String pubTopic = "testtopic/5";
        String content = "hiveClient 来的消息";
        MqttQos qos = MqttQos.AT_LEAST_ONCE;
        String brokerUrl = "localhost";
        String clientId = "hivePublisherClientTest";
        String username = "hivePublisher";
        byte[] password = generateWillExpiredJwt(username).getBytes(StandardCharsets.UTF_8);

        final Mqtt5BlockingClient client = Mqtt5Client.builder()
                .identifier(clientId)
                .simpleAuth()
                    .username(username)
                    .password(password)
                    .applySimpleAuth()
                .serverHost(brokerUrl)
                .automaticReconnect()
                .initialDelay(1, TimeUnit.SECONDS)
                .maxDelay(1, TimeUnit.SECONDS)
                .applyAutomaticReconnect()
                .addDisconnectedListener(context -> {
                    // 如果不添加这段代码,会一直尝试重新连接
                    log.info("Disconnected from {}, attempts={}", brokerUrl, context.getReconnector().getAttempts());
                    if (context.getReconnector().getAttempts() > 10 && context.getReconnector().isReconnect()) {
                        log.info("Reconnect failed, exit");
                        context.getReconnector().reconnect(false);
                    }
                })
                .buildBlocking();

        try {
            client.connect();
            log.info("Connected to {}", brokerUrl);

            Thread.sleep(3000);

            Mqtt5PublishResult send = client.publishWith()
                    .topic(pubTopic)
                    .qos(qos)
                    .payload(content.getBytes())
                    .send();
            System.out.println(send);
        } catch (Exception e) {
            if (e instanceof Mqtt5PubAckException mqtt5PubAckException) {
                Mqtt5PubAck pubAck = mqtt5PubAckException.getMqttMessage();
                // 这里的 error code 是 135
                log.error("publish failed, reasonCode={}, reasonString={}", pubAck.getReasonCode().getCode(), pubAck.getReasonString());
            }
        }
    }

}

所以我认为,我的这个 case 是 jwt 模块返回了 135

抱歉,我重新看了一下你的 case 代码。是用了 jwt 中携带 acl 权限列表的功能对吧。

https://www.emqx.io/docs/zh/v5.6/access-control/authn/jwt.html#权限列表

那看起来在 jwt 过期后 subscribe ,由于 jwt 过期导致 acl 失败是正常的。

但有一点需要说明:“得到一个 JWT 过期的异常” 是不可能的,因为在 subscribe 的时候,不会再次传输 jwt , EMQX 仍然会使用 CONNECT 时携带的那个“过期的” jwt 进行权限验证。
Client 只能得到一个 135 的原因码,reason = not_authorized,这是在 MQTT 协议层发生的,所以 java 调用栈也不会得到 jwt 过期的异常。

我重新梳理一下我的问题

1

最开始的时候,我的设置是 deny_action=ignore

首先 connect 成功,调用 publish 的时候,jwt 已经过期,这个时候,生产者得到的 reasonCode=135

但是在执行消费者的时候
同样先 connect 成功,调用 subscribe 的时候,jwt 已过期,这个时候,消费者客户端得不到任何反馈(一直保持连接成功,监控页面显示订阅的 topic 为空)。

这个时候很明显,生产者得到了发布失败的响应,但是消费者没有响应,我认为这是不合理的。

2

我现在将 deny_action = disconnect

publish 的时候,jwt 过期,客户端断开连接,并收到 reasonCode = 135

subscribe 的时候,jwt 过期,客户端断开连接,并收到 reasonCode = 135

这达到了我的目的。

最后,我想知道情况 #1 是一个 bug,还是设计如此

case #2 中的描述是符合预期,但 #1 中的描述:“subscribe 时无任何反馈不太对”

正常情况下在 jwt 过期后 subscribe, 也会得到一个包含 reasonCode=135 的 SUBACK 报文。
你可以抓包看一下?抓包结果是最准确的。如果抓包的 SUBACK 中没有 reasonCode=135 那就是 bug,如果有那就是正常的

不过就 case #2 的行为来看,你用的 MQTT 框架应该是没有处理 SUBACK 报文

deny action: ignore
给客户端回复 reasonCode=135 的 SUBACK 报文


deny action: disconnect
给客户端回复 reasonCode=135 的 DISCONNECT 报文

通过抓包我发现了,EMQX 返回了 reasonCode=135,所以应该是客户端的问题。感谢您的支持