stm32开发之netxduo组件之mqtt客户端的使用记录

前言

1使用mqtt协议的简单示例记录

代码

MQTT服务端(C# 编写,使用MQTTnet提供的示例代码)

主程序


namespace ConsoleApp1;

public class Program
{

    public static async Task Main(string[] args)
    {
      await Run_Server_With_Logging();

    }
    
    
   
}
   public static async Task Run_Server_With_Logging()
    {
        /*
         * This sample starts a simple MQTT server and prints the logs to the output.
         *
         * IMPORTANT! Do not enable logging in live environment. It will decrease performance.
         *
         * See sample "Run_Minimal_Server" for more details.
         */

        var mqttFactory = new MqttFactory(new ConsoleLogger());

        var mqttServerOptions = new MqttServerOptionsBuilder().WithDefaultEndpoint().Build();

        using (var mqttServer = mqttFactory.CreateMqttServer(mqttServerOptions))
        {
            await mqttServer.StartAsync();

            Console.WriteLine("Press Enter to exit.");
            Console.ReadLine();

            // Stop and dispose the MQTT server if it is no longer needed!
            await mqttServer.StopAsync();
        }
    }

NexDuo组件的编写

/*
 * Copyright (c) 2024-2024,shchl
 *
 * SPDX-License-Identifier: Apache-2.0
 *
 * Change Logs:
 * Date           Author       Notes
 * 2024-4-15     shchl   first version
 */
#include "includes.h"

#if APP_COMPONENT_NX_ENABLE
#include "nx_api.h"
#include "nx_stm32_eth_driver.h"

#define     APP_NX_PACKET_SIZE              1536 /*数据包大小*/
#define     APP_NX_PACKET_POOL_SIZE         ((sizeof(NX_PACKET) + APP_NX_PACKET_SIZE) * 16)

#define     APP_NX_IP_STACK_SIZE            2048
#define     APP_NX_IP_PRIORITY              10

#define     APP_NX_ARP_CACHE_SIZE           1024

#define     APP_NX_IP_ADDR                  IP_ADDRESS(192,168,8,9)
#define     APP_NX_IP_SUB_MASK              IP_ADDRESS(255,255,255,0)

/*
*******************************************************************************************************
*                               外部引入变量
*******************************************************************************************************
*/

/*
*******************************************************************************************************
*                               变量
*******************************************************************************************************
*/

void *loc_ip_area;
void *packet_pool_area;
void *arp_cache_area;

NX_PACKET_POOL g_packet_pool;/*全局packet pool 池*/
NX_IP g_loc_ip; /*全局 本地ip实例*/
/*
*********************************************************************************************************
*                                       静态全局变量
*********************************************************************************************************
*/

/*
*********************************************************************************************************
*                                      函数声明
*********************************************************************************************************
*/


/*
*********************************************************************************************************
*                                      外部函数
*********************************************************************************************************
*/
int nx_application_define(void) {
    UINT status;
    /*内存分配*/
    packet_pool_area = app_malloc(APP_NX_PACKET_POOL_SIZE);
    loc_ip_area = app_malloc(APP_NX_IP_STACK_SIZE);
    arp_cache_area = app_malloc(APP_NX_ARP_CACHE_SIZE);
    nx_system_initialize();
    status = app_nx_ip_instance_create();
    if (status) {
        tx_printf("app_nx_ip_instance_create error:%d\r\n", status);
    }

    return NX_SUCCESS;
}

TX_APP_DEFINE_EXPORT(nx_application_define); /*首先创建模块应用*/




UINT app_nx_ip_instance_create() {
    UINT stat = 0;
    /*创建packet pool*/
    stat += nx_packet_pool_create(&g_packet_pool, "NX PACKET POOL",
                                  APP_NX_PACKET_SIZE,
                                  packet_pool_area, APP_NX_PACKET_POOL_SIZE);

    stat += nx_ip_create(&g_loc_ip,
                         "NX IP",
                         APP_NX_IP_ADDR,
                         APP_NX_IP_SUB_MASK,
                         &g_packet_pool,
                         nx_stm32_eth_driver,
                         loc_ip_area,
                         APP_NX_IP_STACK_SIZE,
                         APP_NX_IP_PRIORITY);

    /*ip 相关配置*/
    {
        /* 启用 ARP 并为 IP 实例  提供 ARP 缓存内存。 */
        stat += nx_arp_enable(&g_loc_ip, arp_cache_area, APP_NX_ARP_CACHE_SIZE);
        /* 启用 ICMP */
        stat += nxd_icmp_enable(&g_loc_ip);
        /* 为IP 实例启用 TCP 处理.  */
        stat += nx_tcp_enable(&g_loc_ip);
        /*禁止分包 */
        stat += nx_ip_fragment_disable(&g_loc_ip);
    }

    return stat;
}
/*
*********************************************************************************************************
*                                      内部函数
*********************************************************************************************************
*/











#endif

MQTT 客户端任务线程

/*
 * Copyright (c) 2024-2024,shchl
 *
 * SPDX-License-Identifier: Apache-2.0
 *
 * Change Logs:
 * Date           Author       Notes
 * 2024-4-19     shchl   first version
 *
 * @description mqtt 客户端任务
 */
#include "includes.h"
#include "nxd_mqtt_client.h"

#define QOS0 0
#define QOS1 1
#if 1

#define APP_TASK_NET_MQTT_CLIENT_STACK_SIZE (4096)
#define APP_TASK_NET_MQTT_CLIENT_PRIORITY (10)

#define APP_MQTT_CLIENT_STACK_SIZE 4096
#define APP_MQTT_CLIENT_PRIORITY 2

/* MQTT 相关宏定义*/
#define MQTT_CLIENT_ID "client_id_stm32"
#define MQTT_SERVER_IP_ADDR IP_ADDRESS(192, 168, 8, 2)

#define MQTT_KEEP_ALIVE_TIMER 300
#define STRLEN(p) (sizeof(p) - 1)
#define MQTT_SUB_TOPIC "/sub/topic1"
#define MQTT_SUB_TOPIC2 "/sub/topic2"
#define MQTT_PUB_TOPIC "/pub/topic"

//-------------------------------事件标志位
#define MQTT_APP_CONNECT_ERROR_EVENT ((1) << 0)      // 客户端连接失败事件
#define MQTT_APP_SUB_TOPIC_RECEIVED_EVENT ((1) << 1) // 客户端订阅主题接收到数据事件
#define MQTT_APP_CLIENT_DISCONNECT_EVENT ((1) << 2)  // 客户端断开连接

#define MATT_APP_ALL_EVENT (MQTT_APP_CONNECT_ERROR_EVENT | MQTT_APP_SUB_TOPIC_RECEIVED_EVENT \
                                                               |MQTT_APP_CLIENT_DISCONNECT_EVENT)

/*
*******************************************************************************************************
*                               外部引入变量
*******************************************************************************************************
*/

/*
*******************************************************************************************************
*                               变量
*******************************************************************************************************
*/

TX_EVENT_FLAGS_GROUP mqtt_app_event_group; /*mqtt 事件组*/
NXD_ADDRESS mqtt_server_ip;                /*mqtt 服务地址*/

/*
*********************************************************************************************************
*                                       静态全局变量
*********************************************************************************************************
*/
static NXD_MQTT_CLIENT mqtt_client; /*声明 mqtt 客户端*/
static TX_THREAD net_mqtt_client_thread;
static void *area;

static void *mqtt_client_area;
static UCHAR message_buffer[NXD_MQTT_MAX_MESSAGE_LENGTH];
static UCHAR topic_buffer[NXD_MQTT_MAX_TOPIC_NAME_LENGTH];
UINT topic_actual_len, topic_actual_msg_len;

/*
*********************************************************************************************************
*                                      函数声明
*********************************************************************************************************
*/
static void net_mqtt_client_thread_entry(ULONG par);

static VOID mqtt_disconnect_notify(NXD_MQTT_CLIENT *client);

static VOID mqtt_receive_notify(NXD_MQTT_CLIENT *client_ptr, UINT message_count);


/*
*********************************************************************************************************
*                                      外部函数
*********************************************************************************************************
*/
int app_task_net_mqtt_client_create() {

    area = app_malloc(APP_TASK_NET_MQTT_CLIENT_STACK_SIZE);
    tx_thread_create(&net_mqtt_client_thread,             /* 任务控制块地址 */
                     "app net server",                    /* 任务名 */
                     net_mqtt_client_thread_entry,        /* 启动任务函数地址 */
                     0,                                   /* 传递给任务的参数 */
                     area,                                /* 堆栈基地址 */
                     APP_TASK_NET_MQTT_CLIENT_STACK_SIZE, /* 堆栈空间大小 */
                     APP_TASK_NET_MQTT_CLIENT_PRIORITY,   /* 任务优先级*/
                     APP_TASK_NET_MQTT_CLIENT_PRIORITY,   /* 任务抢占阀值 */
                     TX_NO_TIME_SLICE,                    /* 不开启时间片 */
                     TX_AUTO_START);                      /* 创建后立即启动 */

    /*内存分配*/
    mqtt_client_area = app_malloc(APP_MQTT_CLIENT_STACK_SIZE);

    /*创建mqtt 事件*/
    tx_event_flags_create(&mqtt_app_event_group, "mqtt event");

    /*服务端连接信息配置*/
    mqtt_server_ip.nxd_ip_version = 4;
    mqtt_server_ip.nxd_ip_address.v4 = MQTT_SERVER_IP_ADDR;

    return TX_SUCCESS;
}

TX_THREAD_EXPORT(app_task_net_mqtt_client_create);

/*
*********************************************************************************************************
*                                      内部函数
*********************************************************************************************************
*/
static inline UINT mqtt_client_sub_topic(CHAR *topic, UINT qos) {
    return nxd_mqtt_client_subscribe(&mqtt_client, topic, strlen(topic), qos);
}
static inline UINT mqtt_client_unsub_topic(CHAR *topic) {
    return   nxd_mqtt_client_unsubscribe(&mqtt_client,topic,strlen(topic));
}

static void net_mqtt_client_thread_entry(ULONG par) {


    UINT status = 0;
    ULONG actual_status = 0;
    ULONG actual_event_flag;
    do {
        /* 等待 1 秒钟,让 内部 IP 线程完成其初始化。. */
        status = nx_ip_status_check(&g_loc_ip,
                                    NX_IP_INITIALIZE_DONE,
                                    &actual_status,
                                    NX_IP_PERIODIC_RATE);
    } while (status != NX_SUCCESS);
    /* 创建客户端实例. */
    status = nxd_mqtt_client_create(&mqtt_client,
                                    "mqtt client",
                                    MQTT_CLIENT_ID, strlen(MQTT_CLIENT_ID),
                                    &g_loc_ip, &g_packet_pool,
                                    mqtt_client_area,
                                    APP_MQTT_CLIENT_STACK_SIZE,
                                    APP_MQTT_CLIENT_PRIORITY,
                                    NX_NULL, 0);

    /* 设置通知回调函数 */
    // 客户端断开连接通知
    nxd_mqtt_client_disconnect_notify_set(&mqtt_client, mqtt_disconnect_notify);
    // 设置接收到数据通知回调
    nxd_mqtt_client_receive_notify_set(&mqtt_client, mqtt_receive_notify);
    while (1) {
        /* 开始与服务器的连接*/
        status = nxd_mqtt_client_connect(&mqtt_client,
                                         &mqtt_server_ip,
                                         NXD_MQTT_PORT,
                                         MQTT_KEEP_ALIVE_TIMER,
                                         0,
                                         NX_WAIT_FOREVER);

        if (status == NXD_MQTT_CONNECT_FAILURE) {

            logError("mqtt unable connect mqtt server[%d,%d,%d,%d:%d]", NX_IP_FMT(MQTT_SERVER_IP_ADDR), NXD_MQTT_PORT);

            tx_thread_sleep(1000);
            continue;
        }
        /* 订阅主题 */
        status = mqtt_client_sub_topic(MQTT_SUB_TOPIC, QOS0);
        status = mqtt_client_sub_topic(MQTT_SUB_TOPIC2, QOS0);
        switch (status) {
            case 0:
                break;
            case NXD_MQTT_NOT_CONNECTED:
            default:
                logError("nxd_mqtt_client_subscribe error:%#x", status);
                tx_thread_sleep(200);
                continue;
        }


        while (1) {
            status = tx_event_flags_get(&mqtt_app_event_group, MATT_APP_ALL_EVENT, TX_OR_CLEAR, &actual_event_flag,
                                        100);
            if (status == TX_SUCCESS) {/*接收来自topic 的数据*/
                if (MQTT_APP_SUB_TOPIC_RECEIVED_EVENT == actual_event_flag) {

                    status = nxd_mqtt_client_message_get(&mqtt_client,
                                                         topic_buffer, sizeof(topic_buffer), &topic_actual_len,
                                                         message_buffer, sizeof(message_buffer), &topic_actual_msg_len);
                    if (status == NXD_MQTT_SUCCESS) {
                        /*添加结束符*/
                        topic_buffer[topic_actual_len] = 0;
                        message_buffer[topic_actual_msg_len] = 0;
                        logInfo("topic = %s, message = %s", topic_buffer, message_buffer);
                    }
                } else if (actual_event_flag == MQTT_APP_CLIENT_DISCONNECT_EVENT) {

                    logInfo("client disconnect.....");
                    break;
                }

            } else if (status == TX_NO_EVENTS) {/*没有事件通知*/
                // todo

            } else {
                break;
            }

            /*todo 推送状态数据*/
            /*向发布主题推送数据*/
            //    const char *send_data = "this is publish msg";
            //    status += nxd_mqtt_client_publish(&mqtt_client,
            //                                      MQTT_PUB_TOPIC,
            //                                      STRLEN(MQTT_PUB_TOPIC),
            //                                      (CHAR *) send_data,
            //                                      strlen(send_data),
            //                                      0,
            //                                      QOS1,
            //                                      NX_WAIT_FOREVER); /*等待超时*/

        }
        mqtt_client_unsub_topic(MQTT_SUB_TOPIC); /* Now unsubscribe the topic. */
        mqtt_client_unsub_topic(MQTT_SUB_TOPIC2); /* Now unsubscribe the topic. */
        nxd_mqtt_client_disconnect(&mqtt_client); /* Disconnect from the broker. */
    }
    /*删除客户端,释放资源(需要重新创建) */
    nxd_mqtt_client_delete(&mqtt_client);


}

static VOID mqtt_disconnect_notify(NXD_MQTT_CLIENT *client) {
    /**/
    logInfo("mqtt_disconnect_notify :%s", client->nxd_mqtt_client_id);
    tx_event_flags_set(&mqtt_app_event_group, MQTT_APP_CLIENT_DISCONNECT_EVENT, TX_OR);
}

static VOID mqtt_receive_notify(NXD_MQTT_CLIENT *client_ptr, UINT message_count) {
    NX_PARAMETER_NOT_USED(client_ptr);
//    NX_PARAMETER_NOT_USED(message_count); /*接收的消息数*/
    logInfo("rec msg num:%d", message_count);

    tx_event_flags_set(&mqtt_app_event_group, MQTT_APP_SUB_TOPIC_RECEIVED_EVENT, TX_OR);
}

#endif

测试结果(ok)

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/559025.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

CERLAB无人机自主框架: 1-环境搭建

前言&#xff1a;更多更新文章详见我的个人博客主页【MGodmonkeyの世界】 描述&#xff1a;欢迎来到CERLAB无人机自主框架&#xff0c;这是一个用于自主无人飞行器 (UAV) 的多功能模块化框架。该框架包括不同的组件 (模拟器&#xff0c;感知&#xff0c;映射&#xff0c;规划和…

后台管理系统加水印(react)

效果 代码图片 代码 window.waterMark function (config) {var defaultConfig {content: 我是水印,fontSize: 16px,opacity: 0.3,rotate: -15,color: #ADADAD,modalId: J_waterMarkModalByXHMAndDHL,};config Object.assign({}, defaultConfig, config);var existMarkModal…

亚信安全入选中国数据安全市场图谱

近日&#xff0c;全球领先的IT市场研究和咨询公司IDC发布了《IDC Market Glance&#xff1a;中国数据安全市场图谱&#xff0c;2024》报告&#xff08;以下简称“报告”&#xff09;&#xff0c;报告展示了中国数据安全市场的构成和格局&#xff0c;遴选出不同细分市场领域的主…

rabbitmq 使用SAC队列实现顺序消息

rabbitmq 使用SAC队列实现顺序消息 前提 SAC: single active consumer, 是指如果有多个实例&#xff0c;只允许其中一个实例消费&#xff0c;其他实例为空闲 目的 实现消息顺序消费&#xff0c;操作&#xff1a; 创建4个SAC队列,消息的路由key 取队列个数模&#xff0c;这…

java调用讯飞星火认知模型

前往讯飞开发平台选择产品&#xff0c;获取appId、apiKey、APISecret&#xff0c;这里我选择的是v3.0模型。 java后端实现 本项目以及实现了基本的会话功能&#xff0c;小伙伴可以自己扩充其他的例如绘画功能。 注意&#xff1a;星火模型的api使用的是websocket协议&#xf…

C++11(下篇)

文章目录 C111. 模版的可变参数1.1 模版参数包的使用 2. lambda表达式2.1 Lambda表达式语法捕获列表说明 2.2 lambda的底层 3. 包装器3.1 function包装器3.2 bind 4. 线程库4.1 thread类4.2 mutex类4.3 atomic类4.4 condition_variable类 C11 1. 模版的可变参数 C11支持模版的…

数据结构习题-- 相交链表

数据结构习题-- 相交链表 给你两个单链表的头节点 headA 和 headB &#xff0c;请你找出并返回两个单链表相交的起始节点。如果两个链表不存在相交节点&#xff0c;返回 null 如上图&#xff0c;返回c1结点 注意&#xff1a;这两个链表非环形 方法&#xff1a;集合 分析 由…

关于ERA5气压和温度垂直补偿公式的对比情况

1. 气压和温度垂直补偿对比 「谨代表给个人观点&#xff0c;杠精请自测&#xff0c;对对对&#xff0c;好好好&#xff0c;你说啥都对」。 使用2020-2022陆态网GNSS与探空站并址的48个站点实验&#xff0c;以探空站为真值&#xff0c;验证ERA5精度。怎么确定并址请看前面文章…

Django中的实时通信:WebSockets与异步视图的结合

&#x1f47d;发现宝藏 前些天发现了一个巨牛的人工智能学习网站&#xff0c;通俗易懂&#xff0c;风趣幽默&#xff0c;忍不住分享一下给大家。【点击进入巨牛的人工智能学习网站】。 在现代Web应用程序中&#xff0c;实时通信已经成为了必不可少的功能之一。无论是在线聊天、…

UKP3D,出轴 /平面图时,选项中出图比例,绘图比例,打印比例的区别

Q:用户问&#xff0c;轴测图正常&#xff0c;平面图位置不对&#xff0c;这个也需要在xml里面调整吗&#xff1f; 在此&#xff0c;先不回复上述问题&#xff0c;而是解释在出图规则里的选项意思。 1.图框比例&#xff1a;图框比例1:100&#xff0c;例如选中的图幅是A0横式&…

现代图形API综合比较:Vulkan | DirectX | Metal | WebGPU

Vulkan、DirectX、Metal 和 WebGPU 等低级图形 API 正在融合为类似于当前 GPU 构建方式的模型。 图形处理单元 (GPU) 是异步计算单元&#xff0c;可以处理大量数据&#xff0c;例如复杂的网格几何形状、图像纹理、输出帧缓冲区、变换矩阵或你想要计算的任何数据。 NSDT工具推荐…

TFS(淘宝分布式存储引擎

TFS&#xff08;淘宝分布式存储引擎&#xff09; 什么是TFS&#xff1f; ​ 根据淘宝2016年的数据分析&#xff0c;淘宝卖家已经达到900多万&#xff0c;有上十亿的商品。每一个商品有包括大量的图片和文字(平均&#xff1a;15k)&#xff0c;粗略估计下&#xff0c;数据所占的…

编译一个基于debian/ubuntu,centos,arhlinux第三方系统

目录 前言 准备工作 下载linux源码进行编译 linux源码下载 网站 问题 解决办法 编译 可能会遇到的问题 chroot下载debian环境 进入虚拟环境 把chroot的根目录文件打包为.gz文件 编译init文件&#xff08;用于系统启动时的一系列引导&#xff09; 给予文件夹权限 …

pip下载包opencv出错(报错failed building wheel for opencv-python解决方法)

文章目录 1 报错2 原因3 解决方法参考 1 报错 ERROR: Could not build wheels for opencv-python, which is required to install pypr2 原因 版本不兼容的问题,当使用pip install opencv-python命令安装的是最新版本&#xff0c;当前python版本不支持。需要安装当前版本pyth…

34. 【Android教程】菜单:Menu

作为 Android 用户&#xff0c;你一定见过类似这样的页面&#xff1a; 它就是我们今天的主角——菜单&#xff0c;它的使用场景和作用不用多说&#xff0c;几乎每个 App 都会用到它&#xff0c;今天我们就一起来看看 Android 提供的几种菜单类型及用法。 1. 菜单的几种类型 根…

✌粤嵌—2024/4/12—插入区间✌

代码实现&#xff1a; 解题思路&#xff1a;先将数组 newInterval 插入到数组 intervals 的末尾&#xff0c;再转换成合并区间 /*** Return an array of arrays of size *returnSize.* The sizes of the arrays are returned as *returnColumnSizes array.* Note: Both returne…

LabVIEW专栏六、LabVIEW项目

一、梗概 项目&#xff1a;后缀是.lvproj&#xff0c;在实际开发的过程中&#xff0c;一般是要用LabVIEW中的项目来管理代码&#xff0c;也就是说相关的VI或者外部文件&#xff0c;都要放在项目中来管理。 在LabVIEW项目中&#xff0c;是一个互相依赖的整体&#xff0c;可以包…

319_C++_使用QT自定义的对话框,既能选择文件也能选择文件夹,为什么使用QListView和QTreeView来达成目的?

解析 1: 在 Qt 中,QFileDialog::setOption 方法用于设置文件对话框的一些选项,以改变其行为或外观。QFileDialog::DontUseNativeDialog 是这些选项之一,当设置为 true 时,它会告诉 QFileDialog 不要使用操作系统提供的原生文件对话框,而是使用 Qt 自己实现的对话框样式。…

js作业微博发言与轮播(有瑕疵)

微博 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta name"viewport" content"widthdevice-width, initial-scale1.0"><meta http-equiv"X-UA-Compatible" content&q…

H5 台球猜位置小游戏

刷到抖音有人这样玩&#xff0c;就写了一个这样的小游戏练习一下H5的知识点。 小游戏预览 w(&#xff9f;Д&#xff9f;)w 不开挂越急越完成不了&#xff0c;&#x1f47f;确认15次也没全对… 知识点 获取坐标位置的DOM元素&#xff0c;感觉应该是新的吧&#xff0c;以前的…