以下是一个基本的DPDK多进程收发代码示例,其中有4个接收进程和4个发送进程:
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <inttypes.h>
#include <signal.h>
#include <unistd.h>
#include <rte_eal.h>
#include <rte_common.h>
#include <rte_mbuf.h>
#include <rte_ethdev.h>
#include <rte_cycles.h>
#define RX_RING_SIZE 2048
#define TX_RING_SIZE 2048
#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250
#define BURST_SIZE 32
static volatile bool force_quit;
struct worker_args {
uint16_t port_id;
uint16_t queue_id;
};
static void signal_handler(int signum) {
if (signum == SIGINT || signum == SIGTERM) {
printf("received signal %d, exiting...\n", signum);
force_quit = true;
}
}
static void rx_packets(struct worker_args *args) {
struct rte_eth_dev_info dev_info;
rte_eth_dev_info_get(args->port_id, &dev_info);
const uint16_t nb_rx_queues = dev_info.nb_rx_queues;
uint16_t rx_queues[nb_rx_queues];
for (int i = 0; i < nb_rx_queues; ++i) {
rx_queues[i] = i;
}
printf("worker %d/%d receiving packets on port %d, queue %d\n",
args->queue_id+1, nb_rx_queues, args->port_id, args->queue_id);
struct rte_mempool *mbuf_pool =
rte_pktmbuf_pool_create("mbuf_pool", NUM_MBUFS, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == NULL) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool: %s\n", rte_strerror(rte_errno));
}
struct rte_eth_rxconf rx_conf;
memset(&rx_conf, 0, sizeof(rx_conf));
rx_conf.rx_drop_en = 1;
if (rte_eth_dev_rx_queue_setup(args->port_id, args->queue_id, RX_RING_SIZE,
rte_eth_dev_socket_id(args->port_id), &rx_conf, mbuf_pool) < 0) {
rte_exit(EXIT_FAILURE, "cannot setup rx queue %d for port %d\n",
args->queue_id, args->port_id);
}
struct rte_mbuf *pkts_burst[BURST_SIZE];
while (!force_quit) {
const uint16_t nb_rx = rte_eth_rx_burst(args->port_id, args->queue_id, pkts_burst, BURST_SIZE);
for (int i = 0; i < nb_rx; ++i) {
// do something with the packet
rte_pktmbuf_free(pkts_burst[i]);
}
}
rte_pktmbuf_pool_free(mbuf_pool);
}
static void tx_packets(struct worker_args *args) {
struct rte_eth_dev_info dev_info;
rte_eth_dev_info_get(args->port_id, &dev_info);
const uint16_t nb_tx_queues = dev_info.nb_tx_queues;
uint16_t tx_queues[nb_tx_queues];
for (int i = 0; i < nb_tx_queues; ++i) {
tx_queues[i] = i;
}
printf("worker %d/%d sending packets on port %d, queue %d\n",
args->queue_id+1, nb_tx_queues, args->port_id, args->queue_id);
struct rte_mempool *mbuf_pool =
rte_pktmbuf_pool_create("mbuf_pool", NUM_MBUFS, MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE, rte_socket_id());
if (mbuf_pool == NULL) {
rte_exit(EXIT_FAILURE, "cannot create mbuf pool: %s\n", rte_strerror(rte_errno));
}
struct rte_eth_txconf tx_conf;
memset(&tx_conf, 0, sizeof(tx_conf));
if (rte_eth_dev_tx_queue_setup(args->port_id, args->queue_id, TX_RING_SIZE,
rte_eth_dev_socket_id(args->port_id), &tx_conf) < 0) {
rte_exit(EXIT_FAILURE, "cannot setup tx queue %d for port %d\n",
args->queue_id, args->port_id);
}
struct rte_mbuf *pkts_burst[BURST_SIZE];
while (!force_quit) {
const uint16_t nb_tx = rte_eth_tx_burst(args->port_id, args->queue_id, pkts_burst, BURST_SIZE);
for (int i = 0; i < nb_tx; ++i) {
// do something with the packet
rte_pktmbuf_free(pkts_burst[i]);
}
}
rte_pktmbuf_pool_free(mbuf_pool);
}
static void start_workers(uint16_t port_id) {
struct rte_eth_dev_info dev_info;
rte_eth_dev_info_get(port_id, &dev_info);
const uint16_t nb_rx_queues = dev_info.nb_rx_queues;
const uint16_t nb_tx_queues = dev_info.nb_tx_queues;
if (nb_rx_queues != 4 || nb_tx_queues != 4) {
rte_exit(EXIT_FAILURE, "expected 4 rx and 4 tx queues, got %d rx and %d tx\n",
nb_rx_queues, nb_tx_queues);
}
struct worker_args rx_args[nb_rx_queues];
struct worker_args tx_args[nb_tx_queues];
for (int i = 0; i < nb_rx_queues; ++i) {
rx_args[i].port_id = port_id;
rx_args[i].queue_id = i;
rte_eal_remote_launch((lcore_function_t *)rx_packets, &rx_args[i], i+1);
}
for (int i = 0; i < nb_tx_queues; ++i) {
tx_args[i].port_id = port_id;
tx_args[i].queue_id = i;
rte_eal_remote_launch((lcore_function_t *)tx_packets, &tx_args[i], i+1+nb_rx_queues);
}
}
int main(int argc, char *argv[]) {
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
int ret = rte_eal_init(argc, argv);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "cannot init EAL: %s\n", rte_strerror(rte_errno));
}
argc -= ret;
argv += ret;
if (argc != 2) {
rte_exit(EXIT_FAILURE, "usage: %s <port-id>\n", argv[0]);
}
const uint16_t port_id = atoi(argv[1]);
if (!rte_eth_dev_is_valid_port(port_id)) {
rte_exit(EXIT_FAILURE, "invalid port id %d\n", port_id);
}
ret = rte_eth_dev_configure(port_id, 4, 4, NULL);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "cannot configure port %d: %s\n",
port_id, rte_strerror(-ret));
}
ret = rte_eth_dev_start(port_id);
if (ret < 0) {
rte_exit(EXIT_FAILURE, "cannot start port %d: %s\n",
port_id, rte_strerror(-ret));
}
start_workers(port_id);
rte_eth_dev_stop(port_id);
return 0;
}
该程序可以在多个物理核心上同时运行,每个核心都运行一个DPDK进程。在启动前,您需要指定一个可用的网络接口端口ID。该程序将为指定的端口配置4个接收队列和4个发送队列。
请注意,此代码示例仅提供基本框架,您可能需要根据自己的需求进行修改。