publish was broken by timeout

我在使用中出现 publish was broken by timeout 问题,环境:

	github.com/eclipse/paho.mqtt.golang v1.5.0
EMQX 	 23.2.7.2-emqx-2/11.1.8


package main

import (
	"bufio"
	"encoding/json"
	"fmt"
	mqtt "github.com/eclipse/paho.mqtt.golang"
	"go.uber.org/zap"
	"log"
	"math/rand"
	"os"
	"os/signal"
	"strconv"
	"strings"
	"syscall"
	"time"
)

var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	fmt.Println("Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	fmt.Printf("Connect lost: %v", err)
}

func main() {
	InitLog()
	opts := createClientOptions()
	client := connectMQTT(opts)

	signalChan := make(chan os.Signal, 1)
	signal.Notify(signalChan, os.Interrupt, syscall.SIGTERM)

	go func() {
		<-signalChan
		fmt.Println("Received interrupt signal. Shutting down...")
		client.Disconnect(250)
		os.Exit(0)
	}()

	fime := readFime("1.txt")
	for {
		vc := fime[0]
		go publish(client, vc.Topic, vc.ID, vc.ID)
	}

}

func createClientOptions() *mqtt.ClientOptions {
	opts := mqtt.NewClientOptions()
	opts.AddBroker("tcp://localhost:1883")
	opts.SetClientID("go_mqtt_client")
	opts.SetUsername("admin")
	opts.SetPassword("public")
	opts.SetDefaultPublishHandler(messagePubHandler)
	opts.SetAutoReconnect(true)
	opts.SetMaxReconnectInterval(time.Second * 10)
	opts.SetKeepAlive(time.Second * 30)
	opts.SetPingTimeout(time.Second * 10)
	opts.SetConnectTimeout(time.Second * 30)
	opts.OnConnect = connectHandler
	opts.OnConnectionLost = connectLostHandler
	return opts
}

func connectMQTT(opts *mqtt.ClientOptions) mqtt.Client {
	zap.S().Infof("开始连接MQTT")
	client := mqtt.NewClient(opts)
	token := client.Connect()
	for !token.WaitTimeout(3 * time.Second) {
	}
	if err := token.Error(); err != nil {
		log.Printf("Error connecting to MQTT broker: %v", err)
		time.Sleep(time.Second * 5)
		return connectMQTT(opts)
	}
	return client
}
func readFime(path string) []Vc {
	// 读取 path 每一行用空格分割分隔的数据 第一个是 Topic 第二个是ID

	file, err := os.Open(path)
	if err != nil {
		log.Fatal(err)
	}
	defer file.Close()

	var vcs []Vc
	scanner := bufio.NewScanner(file)
	for scanner.Scan() {
		line := scanner.Text()
		fields := strings.Split(line, "\t")
		if len(fields) != 2 {
			log.Printf("Invalid line format: %s", line)
			continue
		}
		topic := fields[0]
		id, err := strconv.Atoi(fields[1])
		if err != nil {
			log.Printf("Invalid ID: %s", fields[1])
			continue
		}
		vcs = append(vcs, Vc{Topic: topic, ID: id})
	}
	if err := scanner.Err(); err != nil {
		log.Fatal(err)
	}
	return vcs
}

type Vc struct {
	Topic string
	ID    int
}

func publish(client mqtt.Client, topic string, i int, i2 int) {
	// 初始化随机数生成器的种子
	rand.Seed(time.Now().UnixNano())

	// 生成随机数

	var dataRows []DataRow
	for i3 := range 200 {
		randomNum := rand.Intn(21) // Intn返回一个[0, n)范围内的随机数
		dataRows = append(dataRows, DataRow{
			Name:  "信号-" + strconv.Itoa(i3),
			Value: strconv.Itoa(randomNum),
		})
	}

	//
	DataRowList := DataRowList{
		Time:               time.Now().Unix(),
		DeviceUid:          strconv.Itoa(i2),
		IdentificationCode: strconv.Itoa(i2),
		DataRows:           dataRows,
		Nc:                 strconv.Itoa(i2),
	}

	marshal, _ := json.Marshal(DataRowList)

	//fmt.Printf("发送消息: %s  消息主题: %s\n", DataRowList.Time, topic)
	token := client.Publish(topic, 0, false, marshal)

	if token.Wait() && token.Error() != nil {
		zap.S().Error(token.Error())
	}

	time.Sleep(1 * time.Second) // 暂停1秒

}

type DataRowList struct {
	Time               int64     `json:"Time"`               // 秒级时间戳
	DeviceUid          string    `json:"DeviceUid"`          // 能够产生网络通讯的唯一编码
	IdentificationCode string    `json:"IdentificationCode"` // 设备标识码
	DataRows           []DataRow `json:"DataRows"`
	Nc                 string    `json:"Nc"`
}
type DataRow struct {
	Name  string `json:"Name"`
	Value string `json:"Value"`
}

出现:

2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout
2024-08-26 14:02:22     error   test/main.go:151        publish was broken by timeout

请问如何进一步追踪问题

emqx version: 4.3.5

抱歉,我们对 GO 客户端不太熟悉。可能要社区的朋友们帮忙回答下了。