ZBLOG

dpdk多进程收发的代码,其中接收进程4个,发送进程4个

以下是一个基本的DPDK多进程收发代码示例,其中有4个接收进程和4个发送进程:

#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <inttypes.h>
#include <signal.h>
#include <unistd.h>

#include <rte_eal.h>
#include <rte_common.h>
#include <rte_mbuf.h>
#include <rte_ethdev.h>
#include <rte_cycles.h>

#define RX_RING_SIZE 2048
#define TX_RING_SIZE 2048

#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250
#define BURST_SIZE 32

static volatile bool force_quit;

struct worker_args {
    uint16_t port_id;
    uint16_t queue_id;
};

static void signal_handler(int signum) {
    if (signum == SIGINT || signum == SIGTERM) {
        printf("received signal %d, exiting...\n", signum);
        force_quit = true;
    }
}

static void rx_packets(struct worker_args *args) {
    struct rte_eth_dev_info dev_info;
    rte_eth_dev_info_get(args->port_id, &dev_info);

    const uint16_t nb_rx_queues = dev_info.nb_rx_queues;

    uint16_t rx_queues[nb_rx_queues];
    for (int i = 0; i < nb_rx_queues; ++i) {
        rx_queues[i] = i;
    }

    printf("worker %d/%d receiving packets on port %d, queue %d\n",
        args->queue_id+1, nb_rx_queues, args->port_id, args->queue_id);

    struct rte_mempool *mbuf_pool =
        rte_pktmbuf_pool_create("mbuf_pool", NUM_MBUFS, MBUF_CACHE_SIZE, 0,
            RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
    if (mbuf_pool == NULL) {
        rte_exit(EXIT_FAILURE, "cannot create mbuf pool: %s\n", rte_strerror(rte_errno));
    }

    struct rte_eth_rxconf rx_conf;
    memset(&rx_conf, 0, sizeof(rx_conf));
    rx_conf.rx_drop_en = 1;

    if (rte_eth_dev_rx_queue_setup(args->port_id, args->queue_id, RX_RING_SIZE,
            rte_eth_dev_socket_id(args->port_id), &rx_conf, mbuf_pool) < 0) {
        rte_exit(EXIT_FAILURE, "cannot setup rx queue %d for port %d\n",
            args->queue_id, args->port_id);
    }

    struct rte_mbuf *pkts_burst[BURST_SIZE];

    while (!force_quit) {
        const uint16_t nb_rx = rte_eth_rx_burst(args->port_id, args->queue_id, pkts_burst, BURST_SIZE);
        for (int i = 0; i < nb_rx; ++i) {
            // do something with the packet
            rte_pktmbuf_free(pkts_burst[i]);
        }
    }

    rte_pktmbuf_pool_free(mbuf_pool);
}

static void tx_packets(struct worker_args *args) {
    struct rte_eth_dev_info dev_info;
    rte_eth_dev_info_get(args->port_id, &dev_info);

    const uint16_t nb_tx_queues = dev_info.nb_tx_queues;

    uint16_t tx_queues[nb_tx_queues];
    for (int i = 0; i < nb_tx_queues; ++i) {
        tx_queues[i] = i;
    }

    printf("worker %d/%d sending packets on port %d, queue %d\n",
        args->queue_id+1, nb_tx_queues, args->port_id, args->queue_id);

    struct rte_mempool *mbuf_pool =
        rte_pktmbuf_pool_create("mbuf_pool", NUM_MBUFS, MBUF_CACHE_SIZE, 0,
            RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
    if (mbuf_pool == NULL) {
        rte_exit(EXIT_FAILURE, "cannot create mbuf pool: %s\n", rte_strerror(rte_errno));
    }

    struct rte_eth_txconf tx_conf;
    memset(&tx_conf, 0, sizeof(tx_conf));

    if (rte_eth_dev_tx_queue_setup(args->port_id, args->queue_id, TX_RING_SIZE,
            rte_eth_dev_socket_id(args->port_id), &tx_conf) < 0) {
        rte_exit(EXIT_FAILURE, "cannot setup tx queue %d for port %d\n",
            args->queue_id, args->port_id);
    }

    struct rte_mbuf *pkts_burst[BURST_SIZE];

    while (!force_quit) {
        const uint16_t nb_tx = rte_eth_tx_burst(args->port_id, args->queue_id, pkts_burst, BURST_SIZE);
        for (int i = 0; i < nb_tx; ++i) {
            // do something with the packet
            rte_pktmbuf_free(pkts_burst[i]);
        }
    }

    rte_pktmbuf_pool_free(mbuf_pool);
}

static void start_workers(uint16_t port_id) {
    struct rte_eth_dev_info dev_info;
    rte_eth_dev_info_get(port_id, &dev_info);

    const uint16_t nb_rx_queues = dev_info.nb_rx_queues;
    const uint16_t nb_tx_queues = dev_info.nb_tx_queues;

    if (nb_rx_queues != 4 || nb_tx_queues != 4) {
        rte_exit(EXIT_FAILURE, "expected 4 rx and 4 tx queues, got %d rx and %d tx\n",
            nb_rx_queues, nb_tx_queues);
    }

    struct worker_args rx_args[nb_rx_queues];
    struct worker_args tx_args[nb_tx_queues];

    for (int i = 0; i < nb_rx_queues; ++i) {
        rx_args[i].port_id = port_id;
        rx_args[i].queue_id = i;

        rte_eal_remote_launch((lcore_function_t *)rx_packets, &rx_args[i], i+1);
    }

    for (int i = 0; i < nb_tx_queues; ++i) {
        tx_args[i].port_id = port_id;
        tx_args[i].queue_id = i;

        rte_eal_remote_launch((lcore_function_t *)tx_packets, &tx_args[i], i+1+nb_rx_queues);
    }
}

int main(int argc, char *argv[]) {
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    int ret = rte_eal_init(argc, argv);
    if (ret < 0) {
        rte_exit(EXIT_FAILURE, "cannot init EAL: %s\n", rte_strerror(rte_errno));
    }

    argc -= ret;
    argv += ret;

    if (argc != 2) {
        rte_exit(EXIT_FAILURE, "usage: %s <port-id>\n", argv[0]);
    }

    const uint16_t port_id = atoi(argv[1]);

    if (!rte_eth_dev_is_valid_port(port_id)) {
        rte_exit(EXIT_FAILURE, "invalid port id %d\n", port_id);
    }

    ret = rte_eth_dev_configure(port_id, 4, 4, NULL);
    if (ret < 0) {
        rte_exit(EXIT_FAILURE, "cannot configure port %d: %s\n",
            port_id, rte_strerror(-ret));
    }

    ret = rte_eth_dev_start(port_id);
    if (ret < 0) {
        rte_exit(EXIT_FAILURE, "cannot start port %d: %s\n",
            port_id, rte_strerror(-ret));
    }

    start_workers(port_id);

    rte_eth_dev_stop(port_id);

    return 0;
}

该程序可以在多个物理核心上同时运行,每个核心都运行一个DPDK进程。在启动前,您需要指定一个可用的网络接口端口ID。该程序将为指定的端口配置4个接收队列和4个发送队列。

请注意,此代码示例仅提供基本框架,您可能需要根据自己的需求进行修改。

本站部分文章来源于网络,版权归原作者所有,如有侵权请联系站长删除。
转载请注明出处:https://sdn.0voice.com/?id=137

分享:
扫描分享到社交APP
上一篇
下一篇
发表列表
游客 游客
此处应有掌声~
评论列表

还没有评论,快来说点什么吧~

联系我们

在线咨询: 点击这里给我发消息

微信号:3007537140

上班时间: 10:30-22:30

关注我们
x

注册

已经有帐号?