以下是一个简单的多进程DPDK收发代码示例,其中使用了两个进程:
进程1:用于发送数据包 进程2:用于接收并处理数据包
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <stdint.h>
#include <errno.h>
#include <sys/queue.h>
#include <rte_eal.h>
#include <rte_ethdev.h>
#include <rte_mbuf.h>
#define RX_RING_SIZE 1024
#define TX_RING_SIZE 1024
#define NUM_MBUFS 8191
#define MBUF_CACHE_SIZE 250
#define BURST_SIZE 32
static const struct rte_eth_conf port_conf_default = {
.rxmode = { .max_rx_pkt_len = ETHER_MAX_LEN }
};
static struct rte_mempool *mbuf_pool = NULL;
static void
send_packets(int port_id)
{
struct rte_mbuf *m;
int i, ret;
for (i = 0; i < BURST_SIZE; i++) {
m = rte_pktmbuf_alloc(mbuf_pool);
if (m == NULL) {
printf("Could not allocate mbuf\n");
return;
}
/* initialize the packet */
char data[] = "Hello World!";
memcpy(rte_pktmbuf_append(m, sizeof(data)), data, sizeof(data));
ret = rte_eth_tx_burst(port_id, 0, &m, 1);
if (ret != 1) {
rte_pktmbuf_free(m);
printf("Error sending packet\n");
return;
}
}
}
static void
process_packet(struct rte_mbuf *pkt)
{
/* do some processing on the packet */
printf("Received packet of length %u\n", pkt->pkt_len);
}
static void
receive_packets(int port_id)
{
struct rte_mbuf *pkts_burst[BURST_SIZE];
uint16_t nb_rx;
int i;
while (1) {
nb_rx = rte_eth_rx_burst(port_id, 0, pkts_burst, BURST_SIZE);
if (nb_rx == 0)
continue;
for (i = 0; i < nb_rx; i++) {
process_packet(pkts_burst[i]);
rte_pktmbuf_free(pkts_burst[i]);
}
}
}
int
main(int argc, char **argv)
{
int ret;
unsigned nb_ports;
uint16_t port_id;
uint16_t nb_rxd = RX_RING_SIZE;
uint16_t nb_txd = TX_RING_SIZE;
int socket_id = rte_socket_id();
/* initialize EAL */
ret = rte_eal_init(argc, argv);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Error with EAL initialization\n");
argc -= ret;
argv += ret;
/* create memory pool */
mbuf_pool = rte_pktmbuf_pool_create("MBUF_POOL", NUM_MBUFS * rte_lcore_count(), MBUF_CACHE_SIZE, 0, RTE_MBUF_DEFAULT_BUF_SIZE, socket_id);
if (mbuf_pool == NULL)
rte_exit(EXIT_FAILURE, "Cannot create mbuf pool\n");
/* initialize ports */
nb_ports = rte_eth_dev_count_avail();
for (port_id = 0; port_id < nb_ports; port_id++) {
struct rte_eth_conf port_conf = port_conf_default;
struct rte_eth_dev_info dev_info;
uint16_t nb_tx_queues, nb_rx_queues;
/* get device info */
rte_eth_dev_info_get(port_id, &dev_info);
nb_tx_queues = dev_info.max_tx_queues;
nb_rx_queues = dev_info.max_rx_queues;
/* configure device */
ret = rte_eth_dev_configure(port_id, nb_rx_queues, nb_tx_queues, &port_conf);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Cannot configure device: err=%d, port=%u\n", ret, port_id);
/* allocate and set up RX queue */
ret = rte_eth_rx_queue_setup(port_id, 0, nb_rxd, socket_id, NULL, mbuf_pool);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Cannot allocate RX buffer\n");
/* allocate and set up TX queue */
ret = rte_eth_tx_queue_setup(port_id, 0, nb_txd, socket_id, NULL);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Cannot allocate TX buffer\n");
/* start device */
ret = rte_eth_dev_start(port_id);
if (ret < 0)
rte_exit(EXIT_FAILURE, "Cannot start device\n");
}
/* check if we have enough ports to do I/O */
if (nb_ports < 2)
rte_exit(EXIT_FAILURE, "Error: Not enough ports for I/O\n");
/* run send/receive processes on different cores */
rte_eal_remote_launch((lcore_function_t *)send_packets, (void *)&port_id, 1);
rte_eal_remote_launch((lcore_function_t *)receive_packets, (void *)&port_id, 2);
rte_eal_mp_wait_lcore();
return 0;
}