版本
5.6.0
场景
我生成一个2秒之后过期的 JWT,立刻调用 connect,然后等待 JWT 过期之后,再调用 subscribe
预期
Client 能得到一个 JWT 过期的异常,或者 EMQX 可以主动断开连接
实际情况
EMQX Server会打印出如下日志
[warning] tag: AUTHZ, clientid: AsyncHiveMqtt5SubscriberTest, msg: authorization_permission_denied, peername: 172.24.0.1:45608, username: AsyncHiveMqtt5Subscriber, topic: testtopic/5, action: SUBSCRIBE(Q1), source: jwt
客户端没有任何Exception
从 EMQX 监控页面看到,连接一直保持者,该 client 订阅列表为空
测试代码:
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import io.jsonwebtoken.security.Keys;
import java.nio.charset.StandardCharsets;
import java.security.Key;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class AsyncHiveMqtt5SubscriberAfterTokenExpiry {
public static String generateWillExpiredJwt(String username) {
String secret = "emqxsecretemqxsecretemqxsecretemqxsecret";
Key key = Keys.hmacShaKeyFor(secret.getBytes());
String jwt = Jwts.builder()
.claim("username", username)
.claim("acl",
List.of(
Map.of("permission", "allow", "action", "publish", "topic", "testtopic/4"),
Map.of("permission", "allow", "action", "publish", "topic", "testtopic/5"),
Map.of("permission", "allow", "action", "subscribe", "topic", "testtopic/4"),
Map.of("permission", "allow", "action", "subscribe", "topic", "testtopic/5")
)
)
.signWith(key, SignatureAlgorithm.HS256)
.setExpiration(new Date(System.currentTimeMillis() + 2000))
.compact();
System.out.println("Generated JWT: " + jwt);
return jwt;
}
public static void main(String[] args) {
String subTopic = "testtopic/5";
MqttQos qos = MqttQos.AT_LEAST_ONCE;
String brokerUrl = "localhost";
String clientId = "AsyncHiveMqtt5SubscriberTest";
String username = "AsyncHiveMqtt5Subscriber";
// 生成一个两秒后过期的 jwt
String password = generateWillExpiredJwt(username);
final Mqtt5BlockingClient client = Mqtt5Client.builder()
.identifier(clientId)
.simpleAuth()
.username(username)
.password(password.getBytes(StandardCharsets.UTF_8))
.applySimpleAuth()
.serverHost(brokerUrl)
.automaticReconnect()
.initialDelay(1, TimeUnit.SECONDS)
.maxDelay(10, TimeUnit.SECONDS)
.applyAutomaticReconnect()
.buildBlocking();
client.connect();
System.out.println("Connected to " + brokerUrl);
try {
Thread.sleep(3000);
client.toAsync().subscribeWith()
.topicFilter(subTopic)
.qos(qos)
.callback(mqtt5Publish -> System.out.println("Received message: " + new String(mqtt5Publish.getPayloadAsBytes())))
.send();
} catch (Exception e) {
throw new RuntimeException(e);
}
}