以下是DPDK多线程接收数据包的示例代码:
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_mbuf.h>
#define RX_RING_SIZE 1024
#define NUM_RX_QUEUES 2
#define NUM_TX_QUEUES 2
static volatile bool force_quit;
static void signal_handler(int signum) {
if (signum == SIGINT || signum == SIGTERM) {
force_quit = true;
}
}
static int init_port(uint16_t port, struct rte_mempool *mbuf_pool) {
struct rte_eth_conf port_conf = {
.rxmode = {
.max_rx_pkt_len = RTE_ETHER_MAX_LEN,
.split_hdr_size = 0,
.offloads = DEV_RX_OFFLOAD_CRC_STRIP | DEV_RX_OFFLOAD_CHECKSUM,
},
.txmode = {
.mq_mode = ETH_MQ_TX_NONE,
},
};
const uint16_t rx_rings = NUM_RX_QUEUES;
const uint16_t tx_rings = NUM_TX_QUEUES;
int retval;
uint16_t q;
if (!rte_eth_dev_is_valid_port(port)) {
return -EINVAL;
}
/* Configure the Ethernet device. */
retval = rte_eth_dev_configure(port, rx_rings, tx_rings, &port_conf);
for (q=0; q<rx_rings; q++) {
retval |= rte_eth_rx_queue_setup(port, q, RX_RING_SIZE,
rte_eth_dev_socket_id(port), NULL, mbuf_pool);
struct rte_intr_handle intr_handle;
memset(&intr_handle, 0, sizeof(intr_handle));
/* Enable interrupts for all queues on this port */
if (rte_intr_dpdk_callback_get(port, &intr_handle) != 0) {
printf("Could not get callback for port %u\n", port);
return -1;
}
if (intr_handle.type == RTE_INTR_HANDLE_UNKNOWN) {
printf("Unknown interrupt handle type on port %u\n", port);
return -1;
}
uint16_t vector_id = q; // use same vector id as queue id
if (rte_intr_callback_register(&intr_handle, dpdk_rx_interrupt_handler,
mbuf_pool, vector_id) < 0) {
printf("Could not register rx interrupt handler for port %u and queue %u\n",
port, q);
return -1;
}
/* Enable interrupts on this queue */
rte_eth_dev_rx_intr_enable(port, q);
}
for (q=0; q<tx_rings; q++) {
retval |= rte_eth_tx_queue_setup(port, q, RX_RING_SIZE,
rte_eth_dev_socket_id(port), NULL);
}
if (retval != 0) {
return retval;
}
/* Start the Ethernet port. */
retval = rte_eth_dev_start(port);
if (retval != 0) {
return retval;
}
/* Display the port MAC address. */
struct rte_ether_addr addr;
rte_eth_macaddr_get(port, &addr);
printf("Port %u MAC: %02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02" PRIx8 ":%02"
PRIx8 "\n",
(unsigned)port,
addr.addr_bytes[0], addr.addr_bytes[1],
addr.addr_bytes[2], addr.addr_bytes[3],
addr.addr_bytes[4], addr.addr_bytes[5]);
/* Enable RX in promiscuous mode for the Ethernet device. */
rte_eth_promiscuous_enable(port);
return 0;
}
static void *rx_thread(void *arg) {
const uint16_t port_id = 0;
const uint16_t queue_id = (uint16_t)(intptr_t)arg;
struct rte_mempool *mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS,
MBUF_CACHE_SIZE, 0,
RTE_MBUF_DEFAULT_BUF_SIZE,
SOCKET_ID_ANY);
if (mbuf_pool == NULL) {
printf("Cannot create mbuf pool\n");
exit(EXIT_FAILURE);
}
if (init_port(port_id, mbuf_pool) != 0) {
printf("Cannot initialize port %" PRIu16 "\n", port_id);
exit(EXIT_FAILURE);
}
while (!force_quit) {
struct rte_mbuf *pkts_burst[MAX_PKT_BURST];
const uint16_t nb_rx = rte_eth_rx_burst(port_id, queue_id, pkts_burst, MAX_PKT_BURST);
if (nb_rx > 0) {
for (uint16_t i=0; i<nb_rx; i++) {
// Do something with the received packet
}
rte_pktmbuf_free_bulk(pkts_burst, nb_rx);
}
usleep(10); // sleep to reduce CPU usage
}
return NULL;
}
int main(int argc, char **argv) {
int ret;
/* Initialize DPDK EAL */
ret = rte_eal_init(argc, argv);
if (ret < 0) {
return -1;
}
argc -= ret;
argv += ret;
/* Check for valid port mask */
uint16_t port_mask = 0;
if (argc > 1) {
port_mask = atoi(argv[1]);
if (port_mask == 0) {
printf("Invalid port mask\n");
return -1;
}
} else {
printf("Usage: %s <port-mask>\n", argv[0]);
return -1;
}
/* Install signal handlers for graceful shutdown */
signal(SIGINT, signal_handler);
signal(SIGTERM, signal_handler);
/* Create threads to handle each RX queue on each enabled port */
for (uint16_t i=0; i<NUM_RX_QUEUES; i++) {
if ((port_mask & (1 << i)) != 0) {
pthread_t tid;
ret = pthread_create(&tid, NULL, rx_thread, (void *)(intptr_t)i);
if (ret != 0) {
printf("Cannot create RX thread for queue %" PRIu16 "\n", i);
exit(EXIT_FAILURE);
}
pthread_detach(tid);
printf("Created RX thread for queue %" PRIu16 "\n", i);
}
}
while (!force_quit) {
// Do something in the main thread
usleep(10); // sleep to reduce CPU usage
}
rte_eal_cleanup();
return 0;
}
该示例代码创建了两个RX队列和两个TX队列,然后在每个RX队列上启动一个单独的线程来接收数据包。可以通过传递不同的端口掩码来启用或禁用不同的端口。在DPDK中,必须使用分离的线程来处理每个接收队列,以便能够有效地利用多核处理器。该代码还安装了信号处理程序,以在收到SIGINT或SIGTERM时优雅地关闭应用程序。