Message ID | 1409668221-24707-1-git-send-email-maxim.uvarov@linaro.org |
---|---|
State | New |
Headers | show |
On 2014-09-02 18:30, Maxim Uvarov wrote: > Implement odp implementation for linux-generic using standard > odp queue API. > > Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org> > --- > > New version of IPC queues. > > TODO before commit: > merge this patch with Petri's change to odp_shm_reserve(). > > Maxim. > > > .gitignore | 5 + > configure.ac | 1 + > example/Makefile.am | 2 +- > example/generator/odp_generator.c | 6 +- > example/ipc/Makefile.am | 6 + > example/ipc/README | 46 ++ > example/ipc/odp_ipc.c | 685 +++++++++++++++++++++ > example/l2fwd/odp_l2fwd.c | 6 +- > example/odp_example/odp_example.c | 3 +- > example/packet/odp_pktio.c | 6 +- > example/timer/odp_timer_test.c | 3 +- > include/helper/odp_ring.h | 2 + > include/odp_queue.h | 2 + > include/odp_shared_memory.h | 12 +- > .../linux-generic/include/api/odp_pktio_types.h | 1 + > .../linux-generic/include/odp_packet_io_internal.h | 1 + > .../linux-generic/include/odp_queue_internal.h | 12 +- > platform/linux-generic/odp_buffer_pool.c | 3 +- > platform/linux-generic/odp_packet_io.c | 32 +- > platform/linux-generic/odp_queue.c | 205 +++++- > platform/linux-generic/odp_ring.c | 20 +- > platform/linux-generic/odp_schedule.c | 6 +- > platform/linux-generic/odp_shared_memory.c | 73 ++- > test/api_test/odp_shm_test.c | 3 +- > test/api_test/odp_timer_ping.c | 3 +- > 25 files changed, 1110 insertions(+), 34 deletions(-) > create mode 100644 example/ipc/Makefile.am > create mode 100644 example/ipc/README > create mode 100644 example/ipc/odp_ipc.c > > diff --git a/.gitignore b/.gitignore > index 39c8d77..7eca389 100644 > --- a/.gitignore > +++ b/.gitignore > @@ -5,11 +5,15 @@ > *.patch > *~ > *.lo > +*.swp > +*.swo > +.dirstamp Doesn't belong in this commit. > Makefile > Makefile.in > aclocal.m4 > autom4te.cache/ > compile > +core > config.guess > config.sub > configure > @@ -32,6 +36,7 @@ lib/ > obj/ > build/ > odp_example > +odp_ipc > odp_packet > odp_packet_netmap > odp_atomic > diff --git a/configure.ac b/configure.ac > index 6b75e66..4f4a913 100644 > --- a/configure.ac > +++ b/configure.ac > @@ -119,6 +119,7 @@ AC_CONFIG_FILES([Makefile > platform/linux-keystone2/Makefile > platform/linux-dpdk/Makefile > example/Makefile > + example/ipc/Makefile > example/generator/Makefile > example/l2fwd/Makefile > example/odp_example/Makefile > diff --git a/example/Makefile.am b/example/Makefile.am > index 01a3305..1a5a138 100644 > --- a/example/Makefile.am > +++ b/example/Makefile.am > @@ -1 +1 @@ > -SUBDIRS = generator l2fwd odp_example packet packet_netmap timer > +SUBDIRS = generator l2fwd odp_example packet packet_netmap timer ipc > diff --git a/example/generator/odp_generator.c b/example/generator/odp_generator.c > index b10372e..102102f 100644 > --- a/example/generator/odp_generator.c > +++ b/example/generator/odp_generator.c > @@ -542,7 +542,8 @@ int main(int argc, char *argv[]) > odp_atomic_init_u64(&counters.icmp); > > /* Reserve memory for args from shared mem */ > - args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE); > + args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE, > + ODP_SHM_THREAD); > if (args == NULL) { > ODP_ERR("Error: shared mem alloc failed.\n"); > exit(EXIT_FAILURE); > @@ -587,7 +588,8 @@ int main(int argc, char *argv[]) > > /* Create packet pool */ > pool_base = odp_shm_reserve("shm_packet_pool", > - SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE); > + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE, > + ODP_SHM_THREAD); > if (pool_base == NULL) { > ODP_ERR("Error: packet pool mem alloc failed.\n"); > exit(EXIT_FAILURE); > diff --git a/example/ipc/Makefile.am b/example/ipc/Makefile.am > new file mode 100644 > index 0000000..2fd48f7 > --- /dev/null > +++ b/example/ipc/Makefile.am > @@ -0,0 +1,6 @@ > +include $(top_srcdir)/example/Makefile.inc > + > +bin_PROGRAMS = odp_ipc > +odp_ipc_LDFLAGS = $(AM_LDFLAGS) -static > + > +dist_odp_ipc_SOURCES = odp_ipc.c > diff --git a/example/ipc/README b/example/ipc/README > new file mode 100644 > index 0000000..34f56d7 > --- /dev/null > +++ b/example/ipc/README > @@ -0,0 +1,46 @@ > + ODP IPC example > + > +This example shows how to use queues to excahnge packets between different > +processes. > + > +Example burst mode: > +./odp_fork -i eth0 -m 1 -c 1 > +On remote host run ping. ping what? I guess the the target that runs odp_fork if so please write that. Can you add some ascii art? > + > +[11492/1] enqueue 1 packets, first buf 7921 size 98/1856, cnt 1 > +11490 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +[11492/1] enqueue 1 packets, first buf 7905 size 98/1856, cnt 2 > +11490 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +[11492/1] enqueue 1 packets, first buf 7889 size 98/1856, cnt 3 > +11490 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +[11492/1] enqueue 1 packets, first buf 7873 size 98/1856, cnt 4 > + > + > +Main PID/thread [11492/1] enqueues packets to IPC queue with odp_queue_enq_multi(), > +child process thread ring_thread() dequeues packets from ipc queue. > + > + > +Example queue mode: > + > +./odp_fork -i eth0 -m 1 -c 1 > +waiting for packet... > +Enqueue the packet to ipc queue size 98/1856 > +waiting for packet... > +15917 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +Enqueue the packet to ipc queue size 98/1856 > +waiting for packet... > +15917 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +Enqueue the packet to ipc queue size 98/1856 > +waiting for packet... > +15917 no valid buffer > + ring_thread() got buffer from IPC queue size 98/1856 > +Enqueue the packet to ipc queue size 98/1856 > +waiting for packet... > + > +Thread 15917 moves packets from ingress queue to IPC queue. Other process > +in ring_thread() thread dequeues packets from IPC queue. > diff --git a/example/ipc/odp_ipc.c b/example/ipc/odp_ipc.c > new file mode 100644 > index 0000000..e10874e > --- /dev/null > +++ b/example/ipc/odp_ipc.c > @@ -0,0 +1,685 @@ > +/* Copyright (c) 2014, Linaro Limited > + * All rights reserved. > + * > + * SPDX-License-Identifier: BSD-3-Clause > + */ > + > +/** > + * @file > + * > + * @example odp_ipc.c ODP IPC queues example application > + */ > + > +#include <stdlib.h> > +#include <string.h> > +#include <getopt.h> > +#include <unistd.h> > + > +#include <odp.h> > +#include <helper/odp_linux.h> > +#include <helper/odp_packet_helper.h> > +#include <helper/odp_eth.h> > +#include <helper/odp_ip.h> > +#include <helper/odp_ring.h> > + > +#define MAX_WORKERS 32 > +#define SHM_PKT_POOL_SIZE (512*2048) > +#define SHM_PKT_POOL_BUF_SIZE 1856 > +#define MAX_PKT_BURST 16 > + > +#define APPL_MODE_PKT_BURST 0 > +#define APPL_MODE_PKT_QUEUE 1 > + > +#define RING_SIZE 4096 > +#define ODP_RING_NAMESIZE 32 > + > +#define PRINT_APPL_MODE(x) printf("%s(%i)\n", #x, (x)) > + > +/** Get rid of path in filename - only for unix-type paths using '/' */ > +#define NO_PATH(file_name) (strrchr((file_name), '/') ? \ > + strrchr((file_name), '/') + 1 : (file_name)) > +/** > + * Parsed command line application arguments > + */ > +typedef struct { > + int core_count; Miss doxygen comment > + int if_count; /**< Number of interfaces to be used */ > + char **if_names; /**< Array of pointers to interface names */ > + int mode; /**< Packet IO mode */ > + int type; /**< Packet IO type */ > + int fanout; /**< Packet IO fanout */ describe more in the comments of the tree comments above > + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */ > +} appl_args_t; > + > +/** > + * Thread specific arguments > + */ > +typedef struct { > + char *pktio_dev; /**< Interface name to use */ > + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */ > + int mode; /**< Thread mode */ > + int type; /**< Thread i/o type */ > + int fanout; /**< Thread i/o fanout */ describe more in the comments of the tree comments above > + int tpid; Miss doxygen comment > +} thread_args_t; > + > +/** > + * Grouping of both parsed CL args and thread specific args - alloc together > + */ > +typedef struct { > + /** Application (parsed) arguments */ > + appl_args_t appl; > + /** Thread specific arguments */ Can't this two comments go at the end? > + thread_args_t thread[MAX_WORKERS]; > +} args_t; > + > +/** Global pointer to args */ > +static args_t *args; > + > +/* helper funcs */ > +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len); > +static void parse_args(int argc, char *argv[], appl_args_t *appl_args); > +static void print_info(char *progname, appl_args_t *appl_args); > +static void usage(char *progname); > + > +static void *ring_thread(void *arg) > +{ > + thread_args_t *thr_args; > + thr_args = arg; > + int ret; > + odp_buffer_t buf; > + odp_buffer_pool_t pkt_pool; > + odp_pktio_params_t pktio_ipc_params; > + odp_pktio_t pktio_ipc; > + odp_queue_t ipcq_def; > + > + printf("ODP RING THREAD PID %d\n" ,getpid()); > + > + pkt_pool = odp_buffer_pool_lookup("packet_pool"); > + if (pkt_pool == ODP_BUFFER_POOL_INVALID) { shouldn't we use unlikely here? > + ODP_ERR("Error: pkt_pool not found\n"); > + return NULL; > + } > + > + /* create shared queue between processes*/ > + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; > + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); > + if (pktio_ipc == ODP_PKTIO_INVALID) { shouldn't we use unlikely here? > + ODP_ERR("Error: pktio create failed\n"); > + return NULL; > + } > + > + if (thr_args->tpid) { > + while (1) { > + ipcq_def = odp_queue_lookup("shared-queue"); > + if (ipcq_def != ODP_QUEUE_INVALID) { > + printf("%s() shared-queue found\n", __func__); > + break; > + } > + sleep(1); > + } rewrite the above while like this to make it clearer? ipcq_def = odp_queue_lookup("shared-queue"); while (ipcq_def != ODP_QUEUE_INVALID) { sleep(1); ipcq_def = odp_queue_lookup("shared-queue"); } printf("%s() shared-queue found\n", __func__); > + > + ret = odp_pktio_inq_setdef(pktio_ipc, ipcq_def); > + if (ret != 0) { shouldn't we use unlikely here? > + ODP_ERR("Error: slave thread default ipc-Q setup\n"); > + return NULL; > + } > + > + /* In loop take packets from ipc queue and free this buffer */ > + while (1) { > + buf = odp_queue_deq(ipcq_def); > + if (!odp_buffer_is_valid(buf)) { shouldn't we use unlikely here? > + sleep(1); > + printf("%d no valid buffer\n", getpid()); > + continue; > + } > + > + //buf = odp_schedule(NULL, ODP_SCHED_WAIT); Remove. Cheers, Anders > + > + printf("\t\t%s() got buffer from IPC queue size %ld/%ld\n", __func__, > + (unsigned long)odp_packet_get_len(buf), > + (unsigned long)odp_buffer_size(buf)); > + odp_buffer_free(buf); > + } > + } > + > + /* unreachable */ > + return NULL; > +} > + > + > +/** > + * Packet IO loopback worker thread using ODP queues > + * > + * @param arg thread arguments of type 'thread_args_t *' > + */ > +static void *pktio_queue_thread(void *arg) > +{ > + int thr; > + odp_buffer_pool_t pkt_pool; > + odp_pktio_t pktio; > + odp_pktio_t pktio_ipc; > + thread_args_t *thr_args; > + odp_queue_t inq_def; > + odp_queue_t ipcq_def; > + char inq_name[ODP_QUEUE_NAME_LEN]; > + odp_queue_param_t qparam; > + odp_buffer_t buf; > + int ret; > + odp_pktio_params_t params; > + odp_pktio_params_t pktio_ipc_params; > + socket_params_t *sock_params = ¶ms.sock_params; > + > + thr_args = arg; > + > + thr = odp_thread_id(); > + > + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr, > + thr_args->pktio_dev); > + > + /* lookup ring from its name */ > + /* Lookup the packet pool */ > + pkt_pool = odp_buffer_pool_lookup("packet_pool"); > + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) { > + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr); > + return NULL; > + } > + > + /* Open a packet IO instance for this thread */ > + sock_params->type = thr_args->type; > + sock_params->fanout = thr_args->fanout; > + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, ¶ms); > + if (pktio == ODP_PKTIO_INVALID) { > + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); > + return NULL; > + } > + > + /* > + * Create and set the default INPUT queue associated with the 'pktio' > + * resource > + */ > + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; > + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; > + qparam.sched.group = ODP_SCHED_GROUP_DEFAULT; > + snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio); > + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0'; > + > + inq_def = odp_queue_create(inq_name, ODP_QUEUE_TYPE_PKTIN, &qparam); > + if (inq_def == ODP_QUEUE_INVALID) { > + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); > + return NULL; > + } > + > + ret = odp_pktio_inq_setdef(pktio, inq_def); > + if (ret != 0) { > + ODP_ERR(" [%02i] Error: default input-Q setup\n", thr); > + return NULL; > + } > + > + printf(" [%02i] created pktio:%02i, queue mode (ATOMIC queues)\n" > + " default pktio%02i-INPUT queue:%u\n", > + thr, pktio, pktio, inq_def); > + > + /* create shared queue between processes*/ > + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; > + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); > + if (pktio_ipc == ODP_PKTIO_INVALID) { > + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); > + return NULL; > + } > + ipcq_def = odp_queue_create("shared-queue", ODP_QUEUE_TYPE_IPC, &qparam); > + if (ipcq_def == ODP_QUEUE_INVALID) { > + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); > + return NULL; > + } > + > + /* In loop take packets from inq queue and put them to ipc queue */ > + for (;;) { > + /* Use schedule to get buf from any input queue */ > + printf("waiting for packet...\n"); > + buf = odp_schedule(NULL, ODP_SCHED_WAIT); > + > + printf("Enqueue the packet to ipc queue size %ld/%ld\n", > + (unsigned long)odp_packet_get_len(buf), > + (unsigned long)odp_buffer_size(buf)); > + > + odp_queue_enq(ipcq_def, buf); > + } > + > +/* unreachable */ > +} > + > +/** > + * Packet IO loopback worker thread using bursts from/to IO resources > + * > + * @param arg thread arguments of type 'thread_args_t *' > + */ > +static void *pktio_ifburst_thread(void *arg) > +{ > + int thr; > + odp_buffer_pool_t pkt_pool; > + odp_pktio_t pktio; > + thread_args_t *thr_args; > + int pkts, pkts_ok; > + odp_packet_t pkt_tbl[MAX_PKT_BURST]; > + unsigned long pkt_cnt = 0; > + unsigned long err_cnt = 0; > + odp_pktio_params_t params; > + socket_params_t *sock_params = ¶ms.sock_params; > + int ret; > + > + odp_pktio_t pktio_ipc; > + odp_queue_t ipcq_def; > + char inq_name[ODP_QUEUE_NAME_LEN]; > + odp_queue_param_t qparam; > + odp_pktio_params_t pktio_ipc_params; > + > + thr = odp_thread_id(); > + thr_args = arg; > + > + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr, > + thr_args->pktio_dev); > + > + /* Lookup the packet pool */ > + pkt_pool = odp_buffer_pool_lookup("packet_pool"); > + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) { > + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr); > + return NULL; > + } > + > + /* Open a packet IO instance for this thread */ > + sock_params->type = thr_args->type; > + sock_params->fanout = thr_args->fanout; > + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, ¶ms); > + if (pktio == ODP_PKTIO_INVALID) { > + ODP_ERR(" [%02i] Error: pktio create failed.\n", thr); > + return NULL; > + } > + > + printf(" [%02i] created pktio:%02i, burst mode\n", > + thr, pktio); > + > + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; > + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); > + if (pktio_ipc == ODP_PKTIO_INVALID) { > + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); > + return NULL; > + } > + > + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; > + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; > + qparam.sched.group = ODP_SCHED_GROUP_DEFAULT; > + snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio); > + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0'; > + > + ipcq_def = odp_queue_create("shared-queue", ODP_QUEUE_TYPE_IPC, &qparam); > + if (ipcq_def == ODP_QUEUE_INVALID) { > + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); > + return NULL; > + } > + > + /* Loop packets */ > + for (;;) { > + pkts = odp_pktio_recv(pktio, pkt_tbl, MAX_PKT_BURST); > + if (pkts > 0) { > + /* Drop packets with errors */ > + pkts_ok = drop_err_pkts(pkt_tbl, pkts); > + if (pkts_ok > 0) { > + ret = odp_queue_enq_multi(ipcq_def, pkt_tbl, pkts_ok); > + pkt_cnt += pkts_ok; > + if (ret != 0) { > + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); > + } else { > + printf("[%d/%d] enqueue %d packets, first buf %d size %ld/%ld, cnt %lu\n", > + getpid(), thr, pkts_ok, > + pkt_tbl[0], > + (unsigned long)odp_packet_get_len(pkt_tbl[0]), > + (unsigned long)odp_buffer_size(pkt_tbl[0]), > + pkt_cnt); > + } > + } > + > + if (odp_unlikely(pkts_ok != pkts)) > + ODP_ERR("Dropped frames:%u - err_cnt:%lu\n", > + pkts-pkts_ok, ++err_cnt); > + } > + } > + > +/* unreachable */ > +} > + > +/** > + * ODP packet example main function > + */ > +int main(int argc, char *argv[]) > +{ > + odp_linux_pthread_t thread_tbl[MAX_WORKERS]; > + odp_buffer_pool_t pool; > + int thr_id; > + int num_workers; > + void *pool_base; > + int i; > + int first_core; > + int core_count; > + > + /* Init ODP before calling anything else */ > + if (odp_init_global()) { > + ODP_ERR("Error: ODP global init failed.\n"); > + exit(EXIT_FAILURE); > + } > + > + args = malloc(sizeof(args_t)); > + if (args == NULL) { > + ODP_ERR("Error: shared mem alloc failed.\n"); > + exit(EXIT_FAILURE); > + } > + memset(args, 0, sizeof(*args)); > + > + /* Parse and store the application arguments */ > + parse_args(argc, argv, &args->appl); > + > + /* Print both system and application information */ > + print_info(NO_PATH(argv[0]), &args->appl); > + > + core_count = odp_sys_core_count(); > + num_workers = core_count; > + > + if (args->appl.core_count) > + num_workers = args->appl.core_count; > + > + if (num_workers > MAX_WORKERS) > + num_workers = MAX_WORKERS; > + > + printf("Num worker threads: %i\n", num_workers); > + > + /* > + * By default core #0 runs Linux kernel background tasks. > + * Start mapping thread from core #1 > + */ > + first_core = 1; > + > + if (core_count == 1) > + first_core = 0; > + > + printf("First core: %i\n\n", first_core); > + > + /* Init this thread */ > + thr_id = odp_thread_create(0); > + odp_init_local(thr_id); > + > + /* Create packet pool */ > + pool_base = odp_shm_reserve("shm_packet_pool", > + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE, > + ODP_SHM_PROC); > + if (pool_base == NULL) { > + ODP_ERR("Error: packet pool mem alloc failed.\n"); > + exit(EXIT_FAILURE); > + } > + > + pool = odp_buffer_pool_create("packet_pool", pool_base, > + SHM_PKT_POOL_SIZE, > + SHM_PKT_POOL_BUF_SIZE, > + ODP_CACHE_LINE_SIZE, > + ODP_BUFFER_TYPE_PACKET); > + if (pool == ODP_BUFFER_POOL_INVALID) { > + ODP_ERR("Error: packet pool create failed.\n"); > + exit(EXIT_FAILURE); > + } > + odp_buffer_pool_print(pool); > + > + > + /* Create another process */ > + int f = fork(); > + > + /* Create and init worker threads */ > + memset(thread_tbl, 0, sizeof(thread_tbl)); > + for (i = 0; i < num_workers; ++i) { > + void *(*thr_run_func) (void *); > + int core; > + int if_idx; > + > + core = (first_core + i) % core_count; > + > + if_idx = i % args->appl.if_count; > + > + args->thread[i].pktio_dev = args->appl.if_names[if_idx]; > + args->thread[i].pool = pool; > + args->thread[i].mode = args->appl.mode; > + args->thread[i].type = args->appl.type; > + args->thread[i].fanout = args->appl.fanout; > + args->thread[i].tpid = f; > + > + if (f) { > + thr_run_func = ring_thread; > + } else { > + if (args->appl.mode == APPL_MODE_PKT_BURST) > + thr_run_func = pktio_ifburst_thread; > + else /* APPL_MODE_PKT_QUEUE */ > + thr_run_func = pktio_queue_thread; > + } > + /* > + * Create threads one-by-one instead of all-at-once, > + * because each thread might get different arguments. > + * Calls odp_thread_create(cpu) for each thread > + */ > + odp_linux_pthread_create(thread_tbl, 1, core, thr_run_func, > + &args->thread[i]); > + } > + > + /* Master thread waits for other threads to exit */ > + odp_linux_pthread_join(thread_tbl, num_workers); > + > + printf("Exit\n\n"); > + > + return 0; > +} > + > +/** > + * Drop packets which input parsing marked as containing errors. > + * > + * Frees packets with error and modifies pkt_tbl[] to only contain packets with > + * no detected errors. > + * > + * @param pkt_tbl Array of packet > + * @param len Length of pkt_tbl[] > + * > + * @return Number of packets with no detected error > + */ > +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len) > +{ > + odp_packet_t pkt; > + unsigned pkt_cnt = len; > + unsigned i, j; > + > + for (i = 0, j = 0; i < len; ++i) { > + pkt = pkt_tbl[i]; > + > + if (odp_unlikely(odp_packet_error(pkt))) { > + odp_packet_free(pkt); /* Drop */ > + pkt_cnt--; > + } else if (odp_unlikely(i != j++)) { > + pkt_tbl[j-1] = pkt; > + } > + } > + > + return pkt_cnt; > +} > + > +/** > + * Parse and store the command line arguments > + * > + * @param argc argument count > + * @param argv[] argument vector > + * @param appl_args Store application arguments here > + */ > +static void parse_args(int argc, char *argv[], appl_args_t *appl_args) > +{ > + int opt; > + int long_index; > + char *names, *str, *token, *save; > + int i; > + int len; > + static struct option longopts[] = { > + {"count", required_argument, NULL, 'c'}, > + {"interface", required_argument, NULL, 'i'}, /* return 'i' */ > + {"mode", required_argument, NULL, 'm'}, /* return 'm' */ > + {"help", no_argument, NULL, 'h'}, /* return 'h' */ > + {NULL, 0, NULL, 0} > + }; > + > + appl_args->mode = -1; /* Invalid, must be changed by parsing */ > + appl_args->type = 3; /* 3: ODP_PKTIO_TYPE_SOCKET_MMAP */ > + appl_args->fanout = 1; /* turn off fanout by default for mmap */ > + > + while (1) { > + opt = getopt_long(argc, argv, "+c:i:m:t:f:h", > + longopts, &long_index); > + > + if (opt == -1) > + break; /* No more options */ > + > + switch (opt) { > + case 'c': > + appl_args->core_count = atoi(optarg); > + break; > + /* parse packet-io interface names */ > + case 'i': > + len = strlen(optarg); > + if (len == 0) { > + usage(argv[0]); > + exit(EXIT_FAILURE); > + } > + len += 1; /* add room for '\0' */ > + > + names = malloc(len); > + if (names == NULL) { > + usage(argv[0]); > + exit(EXIT_FAILURE); > + } > + > + /* count the number of tokens separated by ',' */ > + strcpy(names, optarg); > + for (str = names, i = 0;; str = NULL, i++) { > + token = strtok_r(str, ",", &save); > + if (token == NULL) > + break; > + } > + appl_args->if_count = i; > + > + if (appl_args->if_count == 0) { > + usage(argv[0]); > + exit(EXIT_FAILURE); > + } > + > + /* allocate storage for the if names */ > + appl_args->if_names = > + calloc(appl_args->if_count, sizeof(char *)); > + > + /* store the if names (reset names string) */ > + strcpy(names, optarg); > + for (str = names, i = 0;; str = NULL, i++) { > + token = strtok_r(str, ",", &save); > + if (token == NULL) > + break; > + appl_args->if_names[i] = token; > + } > + break; > + > + case 'm': > + i = atoi(optarg); > + if (i == 0) > + appl_args->mode = APPL_MODE_PKT_BURST; > + else > + appl_args->mode = APPL_MODE_PKT_QUEUE; > + break; > + > + case 't': > + appl_args->type = atoi(optarg); > + break; > + > + case 'f': > + appl_args->fanout = atoi(optarg); > + break; > + > + case 'h': > + usage(argv[0]); > + exit(EXIT_SUCCESS); > + break; > + > + default: > + break; > + } > + } > + > + if (appl_args->if_count == 0 || appl_args->mode == -1) { > + usage(argv[0]); > + exit(EXIT_FAILURE); > + } > + > + optind = 1; /* reset 'extern optind' from the getopt lib */ > +} > + > +/** > + * Print system and application info > + */ > +static void print_info(char *progname, appl_args_t *appl_args) > +{ > + int i; > + > + printf("\n" > + "ODP system info\n" > + "---------------\n" > + "ODP API version: %s\n" > + "CPU model: %s\n" > + "CPU freq (hz): %"PRIu64"\n" > + "Cache line size: %i\n" > + "Core count: %i\n" > + "\n", > + odp_version_api_str(), odp_sys_cpu_model_str(), odp_sys_cpu_hz(), > + odp_sys_cache_line_size(), odp_sys_core_count()); > + > + printf("Running ODP appl: \"%s\"\n" > + "-----------------\n" > + "IF-count: %i\n" > + "Using IFs: ", > + progname, appl_args->if_count); > + for (i = 0; i < appl_args->if_count; ++i) > + printf(" %s", appl_args->if_names[i]); > + printf("\n" > + "Mode: "); > + if (appl_args->mode == APPL_MODE_PKT_BURST) > + PRINT_APPL_MODE(APPL_MODE_PKT_BURST); > + else > + PRINT_APPL_MODE(APPL_MODE_PKT_QUEUE); > + printf("\n\n"); > + fflush(NULL); > +} > + > +/** > + * Prinf usage information > + */ > +static void usage(char *progname) > +{ > + printf("\n" > + "Usage: %s OPTIONS\n" > + " E.g. %s -i eth1,eth2,eth3 -m 0\n" > + "\n" > + "OpenDataPlane example application.\n" > + "\n" > + "Mandatory OPTIONS:\n" > + " -i, --interface Eth interfaces (comma-separated, no spaces)\n" > + " -m, --mode 0: Burst send&receive packets (no queues)\n" > + " 1: Send&receive packets through ODP queues.\n" > + " -t, --type 1: ODP_PKTIO_TYPE_SOCKET_BASIC\n" > + " 2: ODP_PKTIO_TYPE_SOCKET_MMSG\n" > + " 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n" > + " 4: ODP_PKTIO_TYPE_NETMAP\n" > + " Default: 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n" > + " -f, --fanout 0: off 1: on (Default 1: on)\n" > + "\n" > + "Optional OPTIONS\n" > + " -c, --count <number> Core count.\n" > + " -h, --help Display help and exit.\n" > + "\n", NO_PATH(progname), NO_PATH(progname) > + ); > +} > diff --git a/example/l2fwd/odp_l2fwd.c b/example/l2fwd/odp_l2fwd.c > index f89ea7a..3a78761 100644 > --- a/example/l2fwd/odp_l2fwd.c > +++ b/example/l2fwd/odp_l2fwd.c > @@ -294,7 +294,8 @@ int main(int argc, char *argv[]) > } > > /* Reserve memory for args from shared mem */ > - gbl_args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE); > + gbl_args = odp_shm_reserve("shm_args", sizeof(args_t), > + ODP_CACHE_LINE_SIZE, ODP_SHM_THREAD); > if (gbl_args == NULL) { > ODP_ERR("Error: shared mem alloc failed.\n"); > exit(EXIT_FAILURE); > @@ -345,7 +346,8 @@ int main(int argc, char *argv[]) > > /* Create packet pool */ > pool_base = odp_shm_reserve("shm_packet_pool", > - SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE); > + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE, > + ODP_SHM_THREAD); > if (pool_base == NULL) { > ODP_ERR("Error: packet pool mem alloc failed.\n"); > exit(EXIT_FAILURE); > diff --git a/example/odp_example/odp_example.c b/example/odp_example/odp_example.c > index be96093..6d075b2 100644 > --- a/example/odp_example/odp_example.c > +++ b/example/odp_example/odp_example.c > @@ -987,7 +987,8 @@ int main(int argc, char *argv[]) > * Create message pool > */ > pool_base = odp_shm_reserve("msg_pool", > - MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE); > + MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE, > + ODP_SHM_THREAD); > > pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE, > sizeof(test_message_t), > diff --git a/example/packet/odp_pktio.c b/example/packet/odp_pktio.c > index 247a28a..c7ff4ef 100644 > --- a/example/packet/odp_pktio.c > +++ b/example/packet/odp_pktio.c > @@ -291,7 +291,8 @@ int main(int argc, char *argv[]) > } > > /* Reserve memory for args from shared mem */ > - args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE); > + args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE, > + ODP_SHM_THREAD); > if (args == NULL) { > ODP_ERR("Error: shared mem alloc failed.\n"); > exit(EXIT_FAILURE); > @@ -332,7 +333,8 @@ int main(int argc, char *argv[]) > > /* Create packet pool */ > pool_base = odp_shm_reserve("shm_packet_pool", > - SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE); > + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE, > + ODP_SHM_THREAD); > if (pool_base == NULL) { > ODP_ERR("Error: packet pool mem alloc failed.\n"); > exit(EXIT_FAILURE); > diff --git a/example/timer/odp_timer_test.c b/example/timer/odp_timer_test.c > index dbe0e5b..113200b 100644 > --- a/example/timer/odp_timer_test.c > +++ b/example/timer/odp_timer_test.c > @@ -260,7 +260,8 @@ int main(int argc, char *argv[]) > * Create message pool > */ > pool_base = odp_shm_reserve("msg_pool", > - MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE); > + MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE, > + ODP_SHM_THREAD); > > pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE, > 0, > diff --git a/include/helper/odp_ring.h b/include/helper/odp_ring.h > index 0911531..60960a2 100644 > --- a/include/helper/odp_ring.h > +++ b/include/helper/odp_ring.h > @@ -158,6 +158,8 @@ typedef struct odp_ring { > > #define ODP_RING_F_SP_ENQ 0x0001 /* The default enqueue is "single-producer". */ > #define ODP_RING_F_SC_DEQ 0x0002 /* The default dequeue is "single-consumer". */ > +#define ODP_RING_SHM_PROC 0x0004 /* If set - ring is visible from different > + processes. Default is thread visible. */ > #define ODP_RING_QUOT_EXCEED (1 << 31) /* Quota exceed for burst ops */ > #define ODP_RING_SZ_MASK (unsigned)(0x0fffffff) /* Ring size mask */ > > diff --git a/include/odp_queue.h b/include/odp_queue.h > index 5e083f1..4700a62 100644 > --- a/include/odp_queue.h > +++ b/include/odp_queue.h > @@ -44,6 +44,8 @@ typedef int odp_queue_type_t; > #define ODP_QUEUE_TYPE_POLL 1 /**< Not scheduled queue */ > #define ODP_QUEUE_TYPE_PKTIN 2 /**< Packet input queue */ > #define ODP_QUEUE_TYPE_PKTOUT 3 /**< Packet output queue */ > +#define ODP_QUEUE_TYPE_IPC 4 /**< Packet ipc queue */ > +#define ODP_QUEUE_TYPE_IPC_LOOKUP 5 /**< Packet ipc queue */ > > /** > * ODP schedule priority > diff --git a/include/odp_shared_memory.h b/include/odp_shared_memory.h > index 8ac8847..1d8e8bb 100644 > --- a/include/odp_shared_memory.h > +++ b/include/odp_shared_memory.h > @@ -24,6 +24,13 @@ extern "C" { > /** Maximum shared memory block name lenght in chars */ > #define ODP_SHM_NAME_LEN 32 > > +typedef enum { > + ODP_SHM_THREAD = 1, /**< Memory accessible by threads. */ > + ODP_SHM_PROC = 2, /**< Memory accessible by processes. > + Will be created if not exist. */ > + ODP_SHM_PROC_NOCREAT = 3, /**< Memory accessible by processes. > + Has to be created before usage.*/ > +} odp_shm_e; > > /** > * Reserve a block of shared memory > @@ -31,10 +38,12 @@ extern "C" { > * @param name Name of the block (maximum ODP_SHM_NAME_LEN - 1 chars) > * @param size Block size in bytes > * @param align Block alignment in bytes > + * @param flag Flags for shared memory creation > * > * @return Pointer to the reserved block, or NULL > */ > -void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align); > +void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align, > + odp_shm_e flag); > > /** > * Lookup for a block of shared memory > @@ -44,6 +53,7 @@ void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align); > * @return Pointer to the block, or NULL > */ > void *odp_shm_lookup(const char *name); > +int odp_shm_lookup_ipc(const char *name); > > > /** > diff --git a/platform/linux-generic/include/api/odp_pktio_types.h b/platform/linux-generic/include/api/odp_pktio_types.h > index 8d195a5..e8e27cc 100644 > --- a/platform/linux-generic/include/api/odp_pktio_types.h > +++ b/platform/linux-generic/include/api/odp_pktio_types.h > @@ -21,6 +21,7 @@ typedef enum { > ODP_PKTIO_TYPE_SOCKET_MMSG, > ODP_PKTIO_TYPE_SOCKET_MMAP, > ODP_PKTIO_TYPE_NETMAP, > + ODP_PKTIO_TYPE_IPC, > } odp_pktio_type_t; > > #include <odp_pktio_socket.h> > diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h > index 881cc5f..77fff96 100644 > --- a/platform/linux-generic/include/odp_packet_io_internal.h > +++ b/platform/linux-generic/include/odp_packet_io_internal.h > @@ -35,6 +35,7 @@ struct pktio_entry { > #ifdef ODP_HAVE_NETMAP > pkt_netmap_t pkt_nm; /**< using netmap API for IO */ > #endif > + odp_buffer_pool_t pool; /**< reference to packet pool */ > }; > > typedef union { > diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h > index 8b6c517..f536331 100644 > --- a/platform/linux-generic/include/odp_queue_internal.h > +++ b/platform/linux-generic/include/odp_queue_internal.h > @@ -23,6 +23,7 @@ extern "C" { > #include <odp_packet_io.h> > #include <odp_align.h> > > +#include <helper/odp_ring.h> > > #define USE_TICKETLOCK > > @@ -39,6 +40,8 @@ extern "C" { > #define QUEUE_STATUS_NOTSCHED 2 > #define QUEUE_STATUS_SCHED 3 > > +#define QUEUE_IPC_ENTRIES 4096 /**< number of odp buffers in odp ring queue */ > + > /* forward declaration */ > union queue_entry_u; > > @@ -65,13 +68,14 @@ struct queue_entry_s { > deq_func_t dequeue; > enq_multi_func_t enqueue_multi; > deq_multi_func_t dequeue_multi; > - > odp_queue_t handle; > odp_buffer_t sched_buf; > odp_queue_type_t type; > odp_queue_param_t param; > odp_pktio_t pktin; > odp_pktio_t pktout; > + odp_ring_t *r; /* ring ref */ > + odp_buffer_t **r_p; /* ring memory */ > char name[ODP_QUEUE_NAME_LEN]; > }; > > @@ -84,10 +88,16 @@ typedef union queue_entry_u { > queue_entry_t *get_qentry(uint32_t queue_id); > > int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); > +int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); > + > odp_buffer_hdr_t *queue_deq(queue_entry_t *queue); > +odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue); > > int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); > +int queue_enq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); > + > int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); > +int queue_deq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); > > void queue_lock(queue_entry_t *queue); > void queue_unlock(queue_entry_t *queue); > diff --git a/platform/linux-generic/odp_buffer_pool.c b/platform/linux-generic/odp_buffer_pool.c > index a48781a..9157994 100644 > --- a/platform/linux-generic/odp_buffer_pool.c > +++ b/platform/linux-generic/odp_buffer_pool.c > @@ -102,7 +102,8 @@ int odp_buffer_pool_init_global(void) > > pool_tbl = odp_shm_reserve("odp_buffer_pools", > sizeof(pool_table_t), > - sizeof(pool_entry_t)); > + sizeof(pool_entry_t), > + ODP_SHM_THREAD); > > if (pool_tbl == NULL) > return -1; > diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c > index 33ade10..6672d6b 100644 > --- a/platform/linux-generic/odp_packet_io.c > +++ b/platform/linux-generic/odp_packet_io.c > @@ -55,7 +55,8 @@ int odp_pktio_init_global(void) > > pktio_tbl = odp_shm_reserve("odp_pktio_entries", > sizeof(pktio_table_t), > - sizeof(pktio_entry_t)); > + sizeof(pktio_entry_t), > + ODP_SHM_THREAD); > if (pktio_tbl == NULL) > return -1; > > @@ -129,6 +130,8 @@ static void init_pktio_entry(pktio_entry_t *entry, odp_pktio_params_t *params) > memset(&entry->s.pkt_nm, 0, sizeof(entry->s.pkt_nm)); > break; > #endif > + case ODP_PKTIO_TYPE_IPC: > + break; > default: > ODP_ERR("Packet I/O type not supported. Please recompile\n"); > break; > @@ -194,6 +197,8 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_buffer_pool_t pool, > ODP_DBG("Allocating netmap pktio\n"); > break; > #endif > + case ODP_PKTIO_TYPE_IPC: > + break; > default: > ODP_ERR("Invalid pktio type: %02x\n", params->type); > return ODP_PKTIO_INVALID; > @@ -239,6 +244,9 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_buffer_pool_t pool, > } > break; > #endif > + case ODP_PKTIO_TYPE_IPC: > + pktio_entry->s.pool = pool; > + break; > default: > free_pktio_entry(id); > id = ODP_PKTIO_INVALID; > @@ -381,11 +389,23 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t queue) > pktio_entry_t *pktio_entry = get_entry(id); > queue_entry_t *qentry = queue_to_qentry(queue); > > - if (pktio_entry == NULL || qentry == NULL) > + if (pktio_entry == NULL || qentry == NULL) { > + ODP_ERR("%s() return -q reason %p -- %p\n", > + __func__, > + pktio_entry, qentry); > return -1; > + } > > - if (qentry->s.type != ODP_QUEUE_TYPE_PKTIN) > + switch (qentry->s.type) > + { > + case ODP_QUEUE_TYPE_PKTIN: > + case ODP_QUEUE_TYPE_IPC: > + case ODP_QUEUE_TYPE_IPC_LOOKUP: > + break; > + default: > + ODP_ERR("%s() type is %d\n", __func__, qentry->s.type); > return -1; > + } > > lock_entry(pktio_entry); > pktio_entry->s.inq_default = queue; > @@ -396,6 +416,12 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t queue) > qentry->s.status = QUEUE_STATUS_SCHED; > queue_unlock(qentry); > > + if (qentry->s.type == ODP_QUEUE_TYPE_IPC) > + return 0; > + if (qentry->s.type == ODP_QUEUE_TYPE_IPC_LOOKUP) > + return 0; > + > + > odp_schedule_queue(queue, qentry->s.param.sched.prio); > > return 0; > diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c > index c637bdf..d08e72e 100644 > --- a/platform/linux-generic/odp_queue.c > +++ b/platform/linux-generic/odp_queue.c > @@ -21,6 +21,10 @@ > #include <odp_hints.h> > #include <odp_sync.h> > > +#include <helper/odp_ring.h> > +#include <sys/types.h> > +#include <unistd.h> > + > #ifdef USE_TICKETLOCK > #include <odp_ticketlock.h> > #define LOCK(a) odp_ticketlock_lock(a) > @@ -34,7 +38,7 @@ > #endif > > #include <string.h> > - > +#include <stdlib.h> > > typedef struct queue_table_t { > queue_entry_t queue[ODP_CONFIG_QUEUES]; > @@ -77,6 +81,41 @@ static void queue_init(queue_entry_t *queue, const char *name, > queue->s.enqueue_multi = pktout_enq_multi; > queue->s.dequeue_multi = pktout_deq_multi; > break; > + case ODP_QUEUE_TYPE_IPC: > + queue->s.r = odp_ring_lookup(name); > + queue->s.r_p = (odp_buffer_t **)malloc(QUEUE_IPC_ENTRIES * > + sizeof(odp_buffer_t)); > + if (!queue->s.r) > + { > + queue->s.r = odp_ring_create(name, QUEUE_IPC_ENTRIES, ODP_RING_SHM_PROC); > + if (queue->s.r == NULL) { > + ODP_ERR("ring create failed\n"); > + } > + } > + queue->s.enqueue = queue_enq_ipc; > + queue->s.dequeue = queue_deq_ipc; > + queue->s.enqueue_multi = queue_enq_multi_ipc; > + queue->s.dequeue_multi = queue_deq_multi_ipc; > + break; > + case ODP_QUEUE_TYPE_IPC_LOOKUP: > + queue->s.r_p = (odp_buffer_t **)malloc(QUEUE_IPC_ENTRIES * > + sizeof(odp_buffer_t)); > + if (odp_shm_lookup_ipc(name) == 1) > + { > + size_t ring_size = QUEUE_IPC_ENTRIES * sizeof(void *) > + + sizeof(odp_ring_t); > + queue->s.r = odp_shm_reserve(name, ring_size, > + ODP_CACHE_LINE_SIZE, > + ODP_SHM_PROC_NOCREAT); > + if (queue->s.r == NULL) { > + ODP_ERR("LOOKUP ring create failed\n"); > + } > + } > + queue->s.enqueue = queue_enq_ipc; > + queue->s.dequeue = queue_deq_ipc; > + queue->s.enqueue_multi = queue_enq_multi_ipc; > + queue->s.dequeue_multi = queue_deq_multi_ipc; > + break; > default: > queue->s.enqueue = queue_enq; > queue->s.dequeue = queue_deq; > @@ -99,7 +138,8 @@ int odp_queue_init_global(void) > > queue_tbl = odp_shm_reserve("odp_queues", > sizeof(queue_table_t), > - sizeof(queue_entry_t)); > + sizeof(queue_entry_t), > + ODP_SHM_THREAD); > > if (queue_tbl == NULL) > return -1; > @@ -113,6 +153,11 @@ int odp_queue_init_global(void) > queue->s.handle = queue_from_id(i); > } > > + /* for linux-generic IPC queue implemented totaly in > + * software using odp_ring. > + */ > + odp_ring_tailq_init(); > + > ODP_DBG("done\n"); > ODP_DBG("Queue init global\n"); > ODP_DBG(" struct queue_entry_s size %zu\n", > @@ -243,6 +288,27 @@ odp_queue_t odp_queue_lookup(const char *name) > UNLOCK(&queue->s.lock); > } > > + /* do look up for shared memory object if exist return that queue*/ > + odp_ring_t *r; > + > + r = odp_ring_lookup(name); > + if (r == NULL) { > + if ( odp_shm_lookup_ipc(name) == 1) { > + /* Create local IPC queue connected to shm object */ > + odp_queue_t q = odp_queue_create(name, > + ODP_QUEUE_TYPE_IPC_LOOKUP, NULL); > + if (q != ODP_QUEUE_INVALID) { > + return q; > + } > + } > + } else { > + /* odp ring is in odp_ring_list. That means current process already created > + * link with such name. That might be ipc queue or ring itself. For now > + * print error here > + */ > + ODP_ERR("odp ring with name: \"%s\" already initialized\n", name); > + } > + > return ODP_QUEUE_INVALID; > } > > @@ -276,6 +342,38 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) > return 0; > } > > +int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) > +{ > + int ret; > + odp_ring_t *r = queue->s.r; > + odp_buffer_bits_t handle; > + uint32_t index = buf_hdr->handle.index; > + uint32_t pool_id = buf_hdr->handle.pool; > + odp_buffer_t buf; > + void **rbuf_p; > + > + /* get buffer from buf_hdr */ > + handle.index = index; > + handle.pool = pool_id; > + > + buf = handle.u32; > + > + rbuf_p = (void*)&buf; > + /* use odp_ring locks instead of per process queue lock > + * LOCK(&queue->s.lock); > + */ > + /* queue buffer to the ring. Note: we can't use pointer to buf_hdr > + * here due to poiter will be referenced in different porocess > + */ > + ret = odp_ring_mp_enqueue_bulk(r, rbuf_p, 1); > + if (ret != 0) > + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); > + /* > + * UNLOCK(&queue->s.lock); > + */ > + return 0; > +} > + > > int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) > { > @@ -311,6 +409,43 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) > return 0; > } > > +int queue_enq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) > +{ > + int i; > + int ret = 0; > + odp_ring_t *r = queue->s.r; > + odp_buffer_bits_t handle; > + odp_buffer_t buf; > + void **rbuf_p; > + > + /* use odp_ring locks instead of per process queue lock > + * LOCK(&queue->s.lock); > + */ > + > + /* odp_buffer_t buffers can be in not continius memory, > + * so queue them to IPC ring one by one. > + */ > + for (i = 0; i < num; i++) { > + handle.index = buf_hdr[i]->handle.index; > + handle.pool = buf_hdr[i]->handle.pool; > + > + buf = handle.u32; > + > + rbuf_p = (void*)&buf; > + > + /* queue buffer to the ring. Note: we can't use pointer to buf_hdr > + * here due to poiter will be referenced in different porocess > + */ > + ret += odp_ring_mp_enqueue_bulk(r, rbuf_p, 1); > + if (ret != 0) > + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); > + } > + /* > + * UNLOCK(&queue->s.lock); > + */ > + > + return ret; > +} > > int odp_queue_enq_multi(odp_queue_t handle, odp_buffer_t buf[], int num) > { > @@ -369,6 +504,72 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) > return buf_hdr; > } > > +odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue) > +{ > + odp_buffer_hdr_t *buf_hdr = NULL; > + odp_ring_t *r = queue->s.r; > + int ret; > + odp_buffer_t buf; > + > + /* using odp_ring lock > + * LOCK(&queue->s.lock); > + */ > + ret = odp_ring_mc_dequeue_bulk(r, (void **)queue->s.r_p, 1); > + if (ret == 0) { > + memcpy(&buf, (void *)queue->s.r_p, > + sizeof(odp_buffer_t)); > + buf_hdr = odp_buf_to_hdr(buf); > + } > + /* > + * UNLOCK(&queue->s.lock); > + */ > + > + return buf_hdr; > +} > + > +int queue_deq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) > +{ > + int i = 0; > + odp_ring_t *r = queue->s.r; > + int ret; > + odp_buffer_t buf; > + > + /* use odp ring lock > + * LOCK(&queue->s.lock); > + */ > + > + if (queue->s.head == NULL) { > + /* Already empty queue */ > + } else { > + odp_buffer_hdr_t *hdr = queue->s.head; > + > + ret = odp_ring_mc_dequeue_bulk(r, (void **)queue->s.r_p, num); > + if (ret == 0) { > + for (; i < num && hdr; i++) { > + memcpy(&buf, (void *)queue->s.r_p[i], > + sizeof(odp_buffer_t)); > + > + buf_hdr[i] = odp_buf_to_hdr(buf); > + hdr = hdr->next; > + buf_hdr[i]->next = NULL; > + } > + > + } > + > + queue->s.head = hdr; > + > + if (hdr == NULL) { > + /* Queue is now empty */ > + queue->s.tail = NULL; > + } > + } > + > + /* use odp_ring lock > + * UNLOCK(&queue->s.lock); > + */ > + > + return i; > +} > > int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) > { > diff --git a/platform/linux-generic/odp_ring.c b/platform/linux-generic/odp_ring.c > index 25ff66a..40df789 100644 > --- a/platform/linux-generic/odp_ring.c > +++ b/platform/linux-generic/odp_ring.c > @@ -82,6 +82,9 @@ > #include <odp_rwlock.h> > #include <helper/odp_ring.h> > > +#include <sys/types.h> > +#include <unistd.h> > + > static TAILQ_HEAD(, odp_ring) odp_ring_list; > > /* > @@ -158,6 +161,12 @@ odp_ring_create(const char *name, unsigned count, unsigned flags) > char ring_name[ODP_RING_NAMESIZE]; > odp_ring_t *r; > size_t ring_size; > + odp_shm_e shm_flag; > + > + if (flags & ODP_RING_SHM_PROC) > + shm_flag = ODP_SHM_PROC; > + else > + shm_flag = ODP_SHM_THREAD; > > /* count must be a power of 2 */ > if (!ODP_VAL_IS_POWER_2(count) || (count > ODP_RING_SZ_MASK)) { > @@ -171,7 +180,8 @@ odp_ring_create(const char *name, unsigned count, unsigned flags) > > odp_rwlock_write_lock(&qlock); > /* reserve a memory zone for this ring.*/ > - r = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE); > + r = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, > + shm_flag); > > if (r != NULL) { > /* init the ring structure */ > @@ -545,12 +555,14 @@ void odp_ring_list_dump(void) > /* search a ring from its name */ > odp_ring_t *odp_ring_lookup(const char *name) > { > - odp_ring_t *r = odp_shm_lookup(name); > + odp_ring_t *r; > > odp_rwlock_read_lock(&qlock); > TAILQ_FOREACH(r, &odp_ring_list, next) { > - if (strncmp(name, r->name, ODP_RING_NAMESIZE) == 0) > - break; > + if (strncmp(name, r->name, ODP_RING_NAMESIZE) == 0) { > + odp_rwlock_read_unlock(&qlock); > + return r; > + } > } > odp_rwlock_read_unlock(&qlock); > > diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c > index 9e399f1..9e76692 100644 > --- a/platform/linux-generic/odp_schedule.c > +++ b/platform/linux-generic/odp_schedule.c > @@ -89,7 +89,8 @@ int odp_schedule_init_global(void) > > sched = odp_shm_reserve("odp_scheduler", > sizeof(sched_t), > - ODP_CACHE_LINE_SIZE); > + ODP_CACHE_LINE_SIZE, > + ODP_SHM_THREAD); > > if (sched == NULL) { > ODP_ERR("Schedule init: Shm reserve failed.\n"); > @@ -98,7 +99,8 @@ int odp_schedule_init_global(void) > > > pool_base = odp_shm_reserve("odp_sched_pool", > - SCHED_POOL_SIZE, ODP_CACHE_LINE_SIZE); > + SCHED_POOL_SIZE, ODP_CACHE_LINE_SIZE, > + ODP_SHM_THREAD); > > pool = odp_buffer_pool_create("odp_sched_pool", pool_base, > SCHED_POOL_SIZE, sizeof(queue_desc_t), > diff --git a/platform/linux-generic/odp_shared_memory.c b/platform/linux-generic/odp_shared_memory.c > index 784f42b..ae53bb6 100644 > --- a/platform/linux-generic/odp_shared_memory.c > +++ b/platform/linux-generic/odp_shared_memory.c > @@ -14,10 +14,14 @@ > #include <sys/mman.h> > #include <asm/mman.h> > #include <fcntl.h> > +#include <unistd.h> > +#include <sys/types.h> > > #include <stdio.h> > #include <string.h> > > +#include <helper/odp_ring.h> > +#include <stdlib.h> > > #define ODP_SHM_NUM_BLOCKS 32 > > @@ -59,9 +63,8 @@ int odp_shm_init_global(void) > ODP_DBG("NOTE: mmap does not support huge pages\n"); > #endif > > - addr = mmap(NULL, sizeof(odp_shm_table_t), > - PROT_READ | PROT_WRITE, SHM_FLAGS, -1, 0); > - > + /* malloc instead of mmap to bind table to process. */ > + addr = malloc(sizeof(odp_shm_table_t)); > if (addr == MAP_FAILED) > return -1; > > @@ -95,9 +98,12 @@ static int find_block(const char *name) > } > > > -void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align) > +void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align, > + odp_shm_e flag) > { > - int i; > + int i, ret, shm_open_flags; > + int shm = -1; > + int mmap_flags = MAP_SHARED; > odp_shm_block_t *block; > void *addr; > #ifdef MAP_HUGETLB > @@ -107,8 +113,20 @@ void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align) > page_sz = odp_sys_page_size(); > #endif > > + printf("pid %d: %s() %s size %lld, flag %d\n", > + getpid(), __func__, name, (unsigned long long)size, flag); > + > odp_spinlock_lock(&odp_shm_tbl->lock); > > + /* if object was already created return it's address */ > + if (flag == ODP_SHM_PROC_NOCREAT) { > + for (i = 0; i < ODP_SHM_NUM_BLOCKS; i++) { > + if (strcmp(name, odp_shm_tbl->block[i].name) == 0) { > + return odp_shm_tbl->block[i].addr; > + } > + } > + } > + > if (find_block(name) >= 0) { > /* Found a block with the same name */ > odp_spinlock_unlock(&odp_shm_tbl->lock); > @@ -123,8 +141,8 @@ void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align) > } > > if (i > ODP_SHM_NUM_BLOCKS - 1) { > - /* Table full */ > odp_spinlock_unlock(&odp_shm_tbl->lock); > + ODP_ERR("ODP_SHM_NUM_BLOCKS table is full"); > return NULL; > } > > @@ -133,19 +151,42 @@ void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align) > addr = MAP_FAILED; > block->huge = 0; > > + if (flag != ODP_SHM_THREAD) { > + shm_open_flags = O_RDWR; > + if (flag == ODP_SHM_PROC) > + shm_open_flags |= O_CREAT; > + > + shm = shm_open(name, shm_open_flags, S_IRUSR | S_IWUSR); > + if (shm == -1) { > + odp_spinlock_unlock(&odp_shm_tbl->lock); > + ODP_ERR("shm_open failed"); > + return NULL; > + } > + > + ret = ftruncate(shm, size + align); > + if (ret == -1) { > + odp_spinlock_unlock(&odp_shm_tbl->lock); > + if (flag != ODP_SHM_PROC_NOCREAT) > + shm_unlink(name); > + ODP_ERR("ftruncate failed"); > + return NULL; > + } > + } else { > + mmap_flags |= MAP_ANONYMOUS; > + } > + > #ifdef MAP_HUGETLB > /* Try first huge pages */ > if (huge_sz && (size + align) > page_sz) { > addr = mmap(NULL, size + align, PROT_READ | PROT_WRITE, > - SHM_FLAGS | MAP_HUGETLB, -1, 0); > + mmap_flags | MAP_HUGETLB, shm, 0); > } > #endif > > /* Use normal pages for small or failed huge page allocations */ > if (addr == MAP_FAILED) { > addr = mmap(NULL, size + align, PROT_READ | PROT_WRITE, > - SHM_FLAGS, -1, 0); > - > + mmap_flags, shm, 0); > } else { > block->huge = 1; > } > @@ -153,6 +194,9 @@ void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align) > if (addr == MAP_FAILED) { > /* Alloc failed */ > odp_spinlock_unlock(&odp_shm_tbl->lock); > + if (flag != ODP_SHM_PROC_NOCREAT) > + shm_unlink(name); > + ODP_ERR("MAP_FAILED\n"); > return NULL; > } > > @@ -192,6 +236,17 @@ void *odp_shm_lookup(const char *name) > return addr; > } > > +int odp_shm_lookup_ipc(const char *name) > +{ > + int shm; > + > + shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR); > + if (shm == -1) > + return 0; > + > + close(shm); > + return 1; > +} > > void odp_shm_print_all(void) > { > diff --git a/test/api_test/odp_shm_test.c b/test/api_test/odp_shm_test.c > index 318d662..fc448a4 100644 > --- a/test/api_test/odp_shm_test.c > +++ b/test/api_test/odp_shm_test.c > @@ -47,7 +47,8 @@ int main(int argc ODP_UNUSED, char *argv[] ODP_UNUSED) > odp_print_system_info(); > > test_shared_data = odp_shm_reserve("test_shared_data", > - sizeof(test_shared_data_t), 128); > + sizeof(test_shared_data_t), 128, > + ODP_SHM_THREAD); > memset(test_shared_data, 0, sizeof(test_shared_data_t)); > printf("test shared data at %p\n\n", test_shared_data); > > diff --git a/test/api_test/odp_timer_ping.c b/test/api_test/odp_timer_ping.c > index c1cc255..88f830f 100644 > --- a/test/api_test/odp_timer_ping.c > +++ b/test/api_test/odp_timer_ping.c > @@ -328,7 +328,8 @@ int main(int argc ODP_UNUSED, char *argv[] ODP_UNUSED) > * Create message pool > */ > pool_base = odp_shm_reserve("msg_pool", > - MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE); > + MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE, > + ODP_SHM_THREAD); > > pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE, > BUF_SIZE, > -- > 1.8.5.1.163.gd7aced9 > > > _______________________________________________ > lng-odp mailing list > lng-odp@lists.linaro.org > http://lists.linaro.org/mailman/listinfo/lng-odp
diff --git a/.gitignore b/.gitignore index 39c8d77..7eca389 100644 --- a/.gitignore +++ b/.gitignore @@ -5,11 +5,15 @@ *.patch *~ *.lo +*.swp +*.swo +.dirstamp Makefile Makefile.in aclocal.m4 autom4te.cache/ compile +core config.guess config.sub configure @@ -32,6 +36,7 @@ lib/ obj/ build/ odp_example +odp_ipc odp_packet odp_packet_netmap odp_atomic diff --git a/configure.ac b/configure.ac index 6b75e66..4f4a913 100644 --- a/configure.ac +++ b/configure.ac @@ -119,6 +119,7 @@ AC_CONFIG_FILES([Makefile platform/linux-keystone2/Makefile platform/linux-dpdk/Makefile example/Makefile + example/ipc/Makefile example/generator/Makefile example/l2fwd/Makefile example/odp_example/Makefile diff --git a/example/Makefile.am b/example/Makefile.am index 01a3305..1a5a138 100644 --- a/example/Makefile.am +++ b/example/Makefile.am @@ -1 +1 @@ -SUBDIRS = generator l2fwd odp_example packet packet_netmap timer +SUBDIRS = generator l2fwd odp_example packet packet_netmap timer ipc diff --git a/example/generator/odp_generator.c b/example/generator/odp_generator.c index b10372e..102102f 100644 --- a/example/generator/odp_generator.c +++ b/example/generator/odp_generator.c @@ -542,7 +542,8 @@ int main(int argc, char *argv[]) odp_atomic_init_u64(&counters.icmp); /* Reserve memory for args from shared mem */ - args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE); + args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE, + ODP_SHM_THREAD); if (args == NULL) { ODP_ERR("Error: shared mem alloc failed.\n"); exit(EXIT_FAILURE); @@ -587,7 +588,8 @@ int main(int argc, char *argv[]) /* Create packet pool */ pool_base = odp_shm_reserve("shm_packet_pool", - SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE); + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE, + ODP_SHM_THREAD); if (pool_base == NULL) { ODP_ERR("Error: packet pool mem alloc failed.\n"); exit(EXIT_FAILURE); diff --git a/example/ipc/Makefile.am b/example/ipc/Makefile.am new file mode 100644 index 0000000..2fd48f7 --- /dev/null +++ b/example/ipc/Makefile.am @@ -0,0 +1,6 @@ +include $(top_srcdir)/example/Makefile.inc + +bin_PROGRAMS = odp_ipc +odp_ipc_LDFLAGS = $(AM_LDFLAGS) -static + +dist_odp_ipc_SOURCES = odp_ipc.c diff --git a/example/ipc/README b/example/ipc/README new file mode 100644 index 0000000..34f56d7 --- /dev/null +++ b/example/ipc/README @@ -0,0 +1,46 @@ + ODP IPC example + +This example shows how to use queues to excahnge packets between different +processes. + +Example burst mode: +./odp_fork -i eth0 -m 1 -c 1 +On remote host run ping. + +[11492/1] enqueue 1 packets, first buf 7921 size 98/1856, cnt 1 +11490 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +[11492/1] enqueue 1 packets, first buf 7905 size 98/1856, cnt 2 +11490 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +[11492/1] enqueue 1 packets, first buf 7889 size 98/1856, cnt 3 +11490 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +[11492/1] enqueue 1 packets, first buf 7873 size 98/1856, cnt 4 + + +Main PID/thread [11492/1] enqueues packets to IPC queue with odp_queue_enq_multi(), +child process thread ring_thread() dequeues packets from ipc queue. + + +Example queue mode: + +./odp_fork -i eth0 -m 1 -c 1 +waiting for packet... +Enqueue the packet to ipc queue size 98/1856 +waiting for packet... +15917 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +Enqueue the packet to ipc queue size 98/1856 +waiting for packet... +15917 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +Enqueue the packet to ipc queue size 98/1856 +waiting for packet... +15917 no valid buffer + ring_thread() got buffer from IPC queue size 98/1856 +Enqueue the packet to ipc queue size 98/1856 +waiting for packet... + +Thread 15917 moves packets from ingress queue to IPC queue. Other process +in ring_thread() thread dequeues packets from IPC queue. diff --git a/example/ipc/odp_ipc.c b/example/ipc/odp_ipc.c new file mode 100644 index 0000000..e10874e --- /dev/null +++ b/example/ipc/odp_ipc.c @@ -0,0 +1,685 @@ +/* Copyright (c) 2014, Linaro Limited + * All rights reserved. + * + * SPDX-License-Identifier: BSD-3-Clause + */ + +/** + * @file + * + * @example odp_ipc.c ODP IPC queues example application + */ + +#include <stdlib.h> +#include <string.h> +#include <getopt.h> +#include <unistd.h> + +#include <odp.h> +#include <helper/odp_linux.h> +#include <helper/odp_packet_helper.h> +#include <helper/odp_eth.h> +#include <helper/odp_ip.h> +#include <helper/odp_ring.h> + +#define MAX_WORKERS 32 +#define SHM_PKT_POOL_SIZE (512*2048) +#define SHM_PKT_POOL_BUF_SIZE 1856 +#define MAX_PKT_BURST 16 + +#define APPL_MODE_PKT_BURST 0 +#define APPL_MODE_PKT_QUEUE 1 + +#define RING_SIZE 4096 +#define ODP_RING_NAMESIZE 32 + +#define PRINT_APPL_MODE(x) printf("%s(%i)\n", #x, (x)) + +/** Get rid of path in filename - only for unix-type paths using '/' */ +#define NO_PATH(file_name) (strrchr((file_name), '/') ? \ + strrchr((file_name), '/') + 1 : (file_name)) +/** + * Parsed command line application arguments + */ +typedef struct { + int core_count; + int if_count; /**< Number of interfaces to be used */ + char **if_names; /**< Array of pointers to interface names */ + int mode; /**< Packet IO mode */ + int type; /**< Packet IO type */ + int fanout; /**< Packet IO fanout */ + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */ +} appl_args_t; + +/** + * Thread specific arguments + */ +typedef struct { + char *pktio_dev; /**< Interface name to use */ + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */ + int mode; /**< Thread mode */ + int type; /**< Thread i/o type */ + int fanout; /**< Thread i/o fanout */ + int tpid; +} thread_args_t; + +/** + * Grouping of both parsed CL args and thread specific args - alloc together + */ +typedef struct { + /** Application (parsed) arguments */ + appl_args_t appl; + /** Thread specific arguments */ + thread_args_t thread[MAX_WORKERS]; +} args_t; + +/** Global pointer to args */ +static args_t *args; + +/* helper funcs */ +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len); +static void parse_args(int argc, char *argv[], appl_args_t *appl_args); +static void print_info(char *progname, appl_args_t *appl_args); +static void usage(char *progname); + +static void *ring_thread(void *arg) +{ + thread_args_t *thr_args; + thr_args = arg; + int ret; + odp_buffer_t buf; + odp_buffer_pool_t pkt_pool; + odp_pktio_params_t pktio_ipc_params; + odp_pktio_t pktio_ipc; + odp_queue_t ipcq_def; + + printf("ODP RING THREAD PID %d\n" ,getpid()); + + pkt_pool = odp_buffer_pool_lookup("packet_pool"); + if (pkt_pool == ODP_BUFFER_POOL_INVALID) { + ODP_ERR("Error: pkt_pool not found\n"); + return NULL; + } + + /* create shared queue between processes*/ + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); + if (pktio_ipc == ODP_PKTIO_INVALID) { + ODP_ERR("Error: pktio create failed\n"); + return NULL; + } + + if (thr_args->tpid) { + while (1) { + ipcq_def = odp_queue_lookup("shared-queue"); + if (ipcq_def != ODP_QUEUE_INVALID) { + printf("%s() shared-queue found\n", __func__); + break; + } + sleep(1); + } + + ret = odp_pktio_inq_setdef(pktio_ipc, ipcq_def); + if (ret != 0) { + ODP_ERR("Error: slave thread default ipc-Q setup\n"); + return NULL; + } + + /* In loop take packets from ipc queue and free this buffer */ + while (1) { + buf = odp_queue_deq(ipcq_def); + if (!odp_buffer_is_valid(buf)) { + sleep(1); + printf("%d no valid buffer\n", getpid()); + continue; + } + + //buf = odp_schedule(NULL, ODP_SCHED_WAIT); + + printf("\t\t%s() got buffer from IPC queue size %ld/%ld\n", __func__, + (unsigned long)odp_packet_get_len(buf), + (unsigned long)odp_buffer_size(buf)); + odp_buffer_free(buf); + } + } + + /* unreachable */ + return NULL; +} + + +/** + * Packet IO loopback worker thread using ODP queues + * + * @param arg thread arguments of type 'thread_args_t *' + */ +static void *pktio_queue_thread(void *arg) +{ + int thr; + odp_buffer_pool_t pkt_pool; + odp_pktio_t pktio; + odp_pktio_t pktio_ipc; + thread_args_t *thr_args; + odp_queue_t inq_def; + odp_queue_t ipcq_def; + char inq_name[ODP_QUEUE_NAME_LEN]; + odp_queue_param_t qparam; + odp_buffer_t buf; + int ret; + odp_pktio_params_t params; + odp_pktio_params_t pktio_ipc_params; + socket_params_t *sock_params = ¶ms.sock_params; + + thr_args = arg; + + thr = odp_thread_id(); + + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr, + thr_args->pktio_dev); + + /* lookup ring from its name */ + /* Lookup the packet pool */ + pkt_pool = odp_buffer_pool_lookup("packet_pool"); + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) { + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr); + return NULL; + } + + /* Open a packet IO instance for this thread */ + sock_params->type = thr_args->type; + sock_params->fanout = thr_args->fanout; + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, ¶ms); + if (pktio == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); + return NULL; + } + + /* + * Create and set the default INPUT queue associated with the 'pktio' + * resource + */ + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; + qparam.sched.group = ODP_SCHED_GROUP_DEFAULT; + snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio); + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0'; + + inq_def = odp_queue_create(inq_name, ODP_QUEUE_TYPE_PKTIN, &qparam); + if (inq_def == ODP_QUEUE_INVALID) { + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); + return NULL; + } + + ret = odp_pktio_inq_setdef(pktio, inq_def); + if (ret != 0) { + ODP_ERR(" [%02i] Error: default input-Q setup\n", thr); + return NULL; + } + + printf(" [%02i] created pktio:%02i, queue mode (ATOMIC queues)\n" + " default pktio%02i-INPUT queue:%u\n", + thr, pktio, pktio, inq_def); + + /* create shared queue between processes*/ + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); + if (pktio_ipc == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); + return NULL; + } + ipcq_def = odp_queue_create("shared-queue", ODP_QUEUE_TYPE_IPC, &qparam); + if (ipcq_def == ODP_QUEUE_INVALID) { + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); + return NULL; + } + + /* In loop take packets from inq queue and put them to ipc queue */ + for (;;) { + /* Use schedule to get buf from any input queue */ + printf("waiting for packet...\n"); + buf = odp_schedule(NULL, ODP_SCHED_WAIT); + + printf("Enqueue the packet to ipc queue size %ld/%ld\n", + (unsigned long)odp_packet_get_len(buf), + (unsigned long)odp_buffer_size(buf)); + + odp_queue_enq(ipcq_def, buf); + } + +/* unreachable */ +} + +/** + * Packet IO loopback worker thread using bursts from/to IO resources + * + * @param arg thread arguments of type 'thread_args_t *' + */ +static void *pktio_ifburst_thread(void *arg) +{ + int thr; + odp_buffer_pool_t pkt_pool; + odp_pktio_t pktio; + thread_args_t *thr_args; + int pkts, pkts_ok; + odp_packet_t pkt_tbl[MAX_PKT_BURST]; + unsigned long pkt_cnt = 0; + unsigned long err_cnt = 0; + odp_pktio_params_t params; + socket_params_t *sock_params = ¶ms.sock_params; + int ret; + + odp_pktio_t pktio_ipc; + odp_queue_t ipcq_def; + char inq_name[ODP_QUEUE_NAME_LEN]; + odp_queue_param_t qparam; + odp_pktio_params_t pktio_ipc_params; + + thr = odp_thread_id(); + thr_args = arg; + + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr, + thr_args->pktio_dev); + + /* Lookup the packet pool */ + pkt_pool = odp_buffer_pool_lookup("packet_pool"); + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) { + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr); + return NULL; + } + + /* Open a packet IO instance for this thread */ + sock_params->type = thr_args->type; + sock_params->fanout = thr_args->fanout; + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, ¶ms); + if (pktio == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: pktio create failed.\n", thr); + return NULL; + } + + printf(" [%02i] created pktio:%02i, burst mode\n", + thr, pktio); + + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC; + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params); + if (pktio_ipc == ODP_PKTIO_INVALID) { + ODP_ERR(" [%02i] Error: pktio create failed\n", thr); + return NULL; + } + + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT; + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC; + qparam.sched.group = ODP_SCHED_GROUP_DEFAULT; + snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio); + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0'; + + ipcq_def = odp_queue_create("shared-queue", ODP_QUEUE_TYPE_IPC, &qparam); + if (ipcq_def == ODP_QUEUE_INVALID) { + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr); + return NULL; + } + + /* Loop packets */ + for (;;) { + pkts = odp_pktio_recv(pktio, pkt_tbl, MAX_PKT_BURST); + if (pkts > 0) { + /* Drop packets with errors */ + pkts_ok = drop_err_pkts(pkt_tbl, pkts); + if (pkts_ok > 0) { + ret = odp_queue_enq_multi(ipcq_def, pkt_tbl, pkts_ok); + pkt_cnt += pkts_ok; + if (ret != 0) { + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); + } else { + printf("[%d/%d] enqueue %d packets, first buf %d size %ld/%ld, cnt %lu\n", + getpid(), thr, pkts_ok, + pkt_tbl[0], + (unsigned long)odp_packet_get_len(pkt_tbl[0]), + (unsigned long)odp_buffer_size(pkt_tbl[0]), + pkt_cnt); + } + } + + if (odp_unlikely(pkts_ok != pkts)) + ODP_ERR("Dropped frames:%u - err_cnt:%lu\n", + pkts-pkts_ok, ++err_cnt); + } + } + +/* unreachable */ +} + +/** + * ODP packet example main function + */ +int main(int argc, char *argv[]) +{ + odp_linux_pthread_t thread_tbl[MAX_WORKERS]; + odp_buffer_pool_t pool; + int thr_id; + int num_workers; + void *pool_base; + int i; + int first_core; + int core_count; + + /* Init ODP before calling anything else */ + if (odp_init_global()) { + ODP_ERR("Error: ODP global init failed.\n"); + exit(EXIT_FAILURE); + } + + args = malloc(sizeof(args_t)); + if (args == NULL) { + ODP_ERR("Error: shared mem alloc failed.\n"); + exit(EXIT_FAILURE); + } + memset(args, 0, sizeof(*args)); + + /* Parse and store the application arguments */ + parse_args(argc, argv, &args->appl); + + /* Print both system and application information */ + print_info(NO_PATH(argv[0]), &args->appl); + + core_count = odp_sys_core_count(); + num_workers = core_count; + + if (args->appl.core_count) + num_workers = args->appl.core_count; + + if (num_workers > MAX_WORKERS) + num_workers = MAX_WORKERS; + + printf("Num worker threads: %i\n", num_workers); + + /* + * By default core #0 runs Linux kernel background tasks. + * Start mapping thread from core #1 + */ + first_core = 1; + + if (core_count == 1) + first_core = 0; + + printf("First core: %i\n\n", first_core); + + /* Init this thread */ + thr_id = odp_thread_create(0); + odp_init_local(thr_id); + + /* Create packet pool */ + pool_base = odp_shm_reserve("shm_packet_pool", + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE, + ODP_SHM_PROC); + if (pool_base == NULL) { + ODP_ERR("Error: packet pool mem alloc failed.\n"); + exit(EXIT_FAILURE); + } + + pool = odp_buffer_pool_create("packet_pool", pool_base, + SHM_PKT_POOL_SIZE, + SHM_PKT_POOL_BUF_SIZE, + ODP_CACHE_LINE_SIZE, + ODP_BUFFER_TYPE_PACKET); + if (pool == ODP_BUFFER_POOL_INVALID) { + ODP_ERR("Error: packet pool create failed.\n"); + exit(EXIT_FAILURE); + } + odp_buffer_pool_print(pool); + + + /* Create another process */ + int f = fork(); + + /* Create and init worker threads */ + memset(thread_tbl, 0, sizeof(thread_tbl)); + for (i = 0; i < num_workers; ++i) { + void *(*thr_run_func) (void *); + int core; + int if_idx; + + core = (first_core + i) % core_count; + + if_idx = i % args->appl.if_count; + + args->thread[i].pktio_dev = args->appl.if_names[if_idx]; + args->thread[i].pool = pool; + args->thread[i].mode = args->appl.mode; + args->thread[i].type = args->appl.type; + args->thread[i].fanout = args->appl.fanout; + args->thread[i].tpid = f; + + if (f) { + thr_run_func = ring_thread; + } else { + if (args->appl.mode == APPL_MODE_PKT_BURST) + thr_run_func = pktio_ifburst_thread; + else /* APPL_MODE_PKT_QUEUE */ + thr_run_func = pktio_queue_thread; + } + /* + * Create threads one-by-one instead of all-at-once, + * because each thread might get different arguments. + * Calls odp_thread_create(cpu) for each thread + */ + odp_linux_pthread_create(thread_tbl, 1, core, thr_run_func, + &args->thread[i]); + } + + /* Master thread waits for other threads to exit */ + odp_linux_pthread_join(thread_tbl, num_workers); + + printf("Exit\n\n"); + + return 0; +} + +/** + * Drop packets which input parsing marked as containing errors. + * + * Frees packets with error and modifies pkt_tbl[] to only contain packets with + * no detected errors. + * + * @param pkt_tbl Array of packet + * @param len Length of pkt_tbl[] + * + * @return Number of packets with no detected error + */ +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len) +{ + odp_packet_t pkt; + unsigned pkt_cnt = len; + unsigned i, j; + + for (i = 0, j = 0; i < len; ++i) { + pkt = pkt_tbl[i]; + + if (odp_unlikely(odp_packet_error(pkt))) { + odp_packet_free(pkt); /* Drop */ + pkt_cnt--; + } else if (odp_unlikely(i != j++)) { + pkt_tbl[j-1] = pkt; + } + } + + return pkt_cnt; +} + +/** + * Parse and store the command line arguments + * + * @param argc argument count + * @param argv[] argument vector + * @param appl_args Store application arguments here + */ +static void parse_args(int argc, char *argv[], appl_args_t *appl_args) +{ + int opt; + int long_index; + char *names, *str, *token, *save; + int i; + int len; + static struct option longopts[] = { + {"count", required_argument, NULL, 'c'}, + {"interface", required_argument, NULL, 'i'}, /* return 'i' */ + {"mode", required_argument, NULL, 'm'}, /* return 'm' */ + {"help", no_argument, NULL, 'h'}, /* return 'h' */ + {NULL, 0, NULL, 0} + }; + + appl_args->mode = -1; /* Invalid, must be changed by parsing */ + appl_args->type = 3; /* 3: ODP_PKTIO_TYPE_SOCKET_MMAP */ + appl_args->fanout = 1; /* turn off fanout by default for mmap */ + + while (1) { + opt = getopt_long(argc, argv, "+c:i:m:t:f:h", + longopts, &long_index); + + if (opt == -1) + break; /* No more options */ + + switch (opt) { + case 'c': + appl_args->core_count = atoi(optarg); + break; + /* parse packet-io interface names */ + case 'i': + len = strlen(optarg); + if (len == 0) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + len += 1; /* add room for '\0' */ + + names = malloc(len); + if (names == NULL) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + /* count the number of tokens separated by ',' */ + strcpy(names, optarg); + for (str = names, i = 0;; str = NULL, i++) { + token = strtok_r(str, ",", &save); + if (token == NULL) + break; + } + appl_args->if_count = i; + + if (appl_args->if_count == 0) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + /* allocate storage for the if names */ + appl_args->if_names = + calloc(appl_args->if_count, sizeof(char *)); + + /* store the if names (reset names string) */ + strcpy(names, optarg); + for (str = names, i = 0;; str = NULL, i++) { + token = strtok_r(str, ",", &save); + if (token == NULL) + break; + appl_args->if_names[i] = token; + } + break; + + case 'm': + i = atoi(optarg); + if (i == 0) + appl_args->mode = APPL_MODE_PKT_BURST; + else + appl_args->mode = APPL_MODE_PKT_QUEUE; + break; + + case 't': + appl_args->type = atoi(optarg); + break; + + case 'f': + appl_args->fanout = atoi(optarg); + break; + + case 'h': + usage(argv[0]); + exit(EXIT_SUCCESS); + break; + + default: + break; + } + } + + if (appl_args->if_count == 0 || appl_args->mode == -1) { + usage(argv[0]); + exit(EXIT_FAILURE); + } + + optind = 1; /* reset 'extern optind' from the getopt lib */ +} + +/** + * Print system and application info + */ +static void print_info(char *progname, appl_args_t *appl_args) +{ + int i; + + printf("\n" + "ODP system info\n" + "---------------\n" + "ODP API version: %s\n" + "CPU model: %s\n" + "CPU freq (hz): %"PRIu64"\n" + "Cache line size: %i\n" + "Core count: %i\n" + "\n", + odp_version_api_str(), odp_sys_cpu_model_str(), odp_sys_cpu_hz(), + odp_sys_cache_line_size(), odp_sys_core_count()); + + printf("Running ODP appl: \"%s\"\n" + "-----------------\n" + "IF-count: %i\n" + "Using IFs: ", + progname, appl_args->if_count); + for (i = 0; i < appl_args->if_count; ++i) + printf(" %s", appl_args->if_names[i]); + printf("\n" + "Mode: "); + if (appl_args->mode == APPL_MODE_PKT_BURST) + PRINT_APPL_MODE(APPL_MODE_PKT_BURST); + else + PRINT_APPL_MODE(APPL_MODE_PKT_QUEUE); + printf("\n\n"); + fflush(NULL); +} + +/** + * Prinf usage information + */ +static void usage(char *progname) +{ + printf("\n" + "Usage: %s OPTIONS\n" + " E.g. %s -i eth1,eth2,eth3 -m 0\n" + "\n" + "OpenDataPlane example application.\n" + "\n" + "Mandatory OPTIONS:\n" + " -i, --interface Eth interfaces (comma-separated, no spaces)\n" + " -m, --mode 0: Burst send&receive packets (no queues)\n" + " 1: Send&receive packets through ODP queues.\n" + " -t, --type 1: ODP_PKTIO_TYPE_SOCKET_BASIC\n" + " 2: ODP_PKTIO_TYPE_SOCKET_MMSG\n" + " 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n" + " 4: ODP_PKTIO_TYPE_NETMAP\n" + " Default: 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n" + " -f, --fanout 0: off 1: on (Default 1: on)\n" + "\n" + "Optional OPTIONS\n" + " -c, --count <number> Core count.\n" + " -h, --help Display help and exit.\n" + "\n", NO_PATH(progname), NO_PATH(progname) + ); +} diff --git a/example/l2fwd/odp_l2fwd.c b/example/l2fwd/odp_l2fwd.c index f89ea7a..3a78761 100644 --- a/example/l2fwd/odp_l2fwd.c +++ b/example/l2fwd/odp_l2fwd.c @@ -294,7 +294,8 @@ int main(int argc, char *argv[]) } /* Reserve memory for args from shared mem */ - gbl_args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE); + gbl_args = odp_shm_reserve("shm_args", sizeof(args_t), + ODP_CACHE_LINE_SIZE, ODP_SHM_THREAD); if (gbl_args == NULL) { ODP_ERR("Error: shared mem alloc failed.\n"); exit(EXIT_FAILURE); @@ -345,7 +346,8 @@ int main(int argc, char *argv[]) /* Create packet pool */ pool_base = odp_shm_reserve("shm_packet_pool", - SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE); + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE, + ODP_SHM_THREAD); if (pool_base == NULL) { ODP_ERR("Error: packet pool mem alloc failed.\n"); exit(EXIT_FAILURE); diff --git a/example/odp_example/odp_example.c b/example/odp_example/odp_example.c index be96093..6d075b2 100644 --- a/example/odp_example/odp_example.c +++ b/example/odp_example/odp_example.c @@ -987,7 +987,8 @@ int main(int argc, char *argv[]) * Create message pool */ pool_base = odp_shm_reserve("msg_pool", - MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE); + MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE, + ODP_SHM_THREAD); pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE, sizeof(test_message_t), diff --git a/example/packet/odp_pktio.c b/example/packet/odp_pktio.c index 247a28a..c7ff4ef 100644 --- a/example/packet/odp_pktio.c +++ b/example/packet/odp_pktio.c @@ -291,7 +291,8 @@ int main(int argc, char *argv[]) } /* Reserve memory for args from shared mem */ - args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE); + args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE, + ODP_SHM_THREAD); if (args == NULL) { ODP_ERR("Error: shared mem alloc failed.\n"); exit(EXIT_FAILURE); @@ -332,7 +333,8 @@ int main(int argc, char *argv[]) /* Create packet pool */ pool_base = odp_shm_reserve("shm_packet_pool", - SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE); + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE, + ODP_SHM_THREAD); if (pool_base == NULL) { ODP_ERR("Error: packet pool mem alloc failed.\n"); exit(EXIT_FAILURE); diff --git a/example/timer/odp_timer_test.c b/example/timer/odp_timer_test.c index dbe0e5b..113200b 100644 --- a/example/timer/odp_timer_test.c +++ b/example/timer/odp_timer_test.c @@ -260,7 +260,8 @@ int main(int argc, char *argv[]) * Create message pool */ pool_base = odp_shm_reserve("msg_pool", - MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE); + MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE, + ODP_SHM_THREAD); pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE, 0, diff --git a/include/helper/odp_ring.h b/include/helper/odp_ring.h index 0911531..60960a2 100644 --- a/include/helper/odp_ring.h +++ b/include/helper/odp_ring.h @@ -158,6 +158,8 @@ typedef struct odp_ring { #define ODP_RING_F_SP_ENQ 0x0001 /* The default enqueue is "single-producer". */ #define ODP_RING_F_SC_DEQ 0x0002 /* The default dequeue is "single-consumer". */ +#define ODP_RING_SHM_PROC 0x0004 /* If set - ring is visible from different + processes. Default is thread visible. */ #define ODP_RING_QUOT_EXCEED (1 << 31) /* Quota exceed for burst ops */ #define ODP_RING_SZ_MASK (unsigned)(0x0fffffff) /* Ring size mask */ diff --git a/include/odp_queue.h b/include/odp_queue.h index 5e083f1..4700a62 100644 --- a/include/odp_queue.h +++ b/include/odp_queue.h @@ -44,6 +44,8 @@ typedef int odp_queue_type_t; #define ODP_QUEUE_TYPE_POLL 1 /**< Not scheduled queue */ #define ODP_QUEUE_TYPE_PKTIN 2 /**< Packet input queue */ #define ODP_QUEUE_TYPE_PKTOUT 3 /**< Packet output queue */ +#define ODP_QUEUE_TYPE_IPC 4 /**< Packet ipc queue */ +#define ODP_QUEUE_TYPE_IPC_LOOKUP 5 /**< Packet ipc queue */ /** * ODP schedule priority diff --git a/include/odp_shared_memory.h b/include/odp_shared_memory.h index 8ac8847..1d8e8bb 100644 --- a/include/odp_shared_memory.h +++ b/include/odp_shared_memory.h @@ -24,6 +24,13 @@ extern "C" { /** Maximum shared memory block name lenght in chars */ #define ODP_SHM_NAME_LEN 32 +typedef enum { + ODP_SHM_THREAD = 1, /**< Memory accessible by threads. */ + ODP_SHM_PROC = 2, /**< Memory accessible by processes. + Will be created if not exist. */ + ODP_SHM_PROC_NOCREAT = 3, /**< Memory accessible by processes. + Has to be created before usage.*/ +} odp_shm_e; /** * Reserve a block of shared memory @@ -31,10 +38,12 @@ extern "C" { * @param name Name of the block (maximum ODP_SHM_NAME_LEN - 1 chars) * @param size Block size in bytes * @param align Block alignment in bytes + * @param flag Flags for shared memory creation * * @return Pointer to the reserved block, or NULL */ -void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align); +void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align, + odp_shm_e flag); /** * Lookup for a block of shared memory @@ -44,6 +53,7 @@ void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align); * @return Pointer to the block, or NULL */ void *odp_shm_lookup(const char *name); +int odp_shm_lookup_ipc(const char *name); /** diff --git a/platform/linux-generic/include/api/odp_pktio_types.h b/platform/linux-generic/include/api/odp_pktio_types.h index 8d195a5..e8e27cc 100644 --- a/platform/linux-generic/include/api/odp_pktio_types.h +++ b/platform/linux-generic/include/api/odp_pktio_types.h @@ -21,6 +21,7 @@ typedef enum { ODP_PKTIO_TYPE_SOCKET_MMSG, ODP_PKTIO_TYPE_SOCKET_MMAP, ODP_PKTIO_TYPE_NETMAP, + ODP_PKTIO_TYPE_IPC, } odp_pktio_type_t; #include <odp_pktio_socket.h> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h b/platform/linux-generic/include/odp_packet_io_internal.h index 881cc5f..77fff96 100644 --- a/platform/linux-generic/include/odp_packet_io_internal.h +++ b/platform/linux-generic/include/odp_packet_io_internal.h @@ -35,6 +35,7 @@ struct pktio_entry { #ifdef ODP_HAVE_NETMAP pkt_netmap_t pkt_nm; /**< using netmap API for IO */ #endif + odp_buffer_pool_t pool; /**< reference to packet pool */ }; typedef union { diff --git a/platform/linux-generic/include/odp_queue_internal.h b/platform/linux-generic/include/odp_queue_internal.h index 8b6c517..f536331 100644 --- a/platform/linux-generic/include/odp_queue_internal.h +++ b/platform/linux-generic/include/odp_queue_internal.h @@ -23,6 +23,7 @@ extern "C" { #include <odp_packet_io.h> #include <odp_align.h> +#include <helper/odp_ring.h> #define USE_TICKETLOCK @@ -39,6 +40,8 @@ extern "C" { #define QUEUE_STATUS_NOTSCHED 2 #define QUEUE_STATUS_SCHED 3 +#define QUEUE_IPC_ENTRIES 4096 /**< number of odp buffers in odp ring queue */ + /* forward declaration */ union queue_entry_u; @@ -65,13 +68,14 @@ struct queue_entry_s { deq_func_t dequeue; enq_multi_func_t enqueue_multi; deq_multi_func_t dequeue_multi; - odp_queue_t handle; odp_buffer_t sched_buf; odp_queue_type_t type; odp_queue_param_t param; odp_pktio_t pktin; odp_pktio_t pktout; + odp_ring_t *r; /* ring ref */ + odp_buffer_t **r_p; /* ring memory */ char name[ODP_QUEUE_NAME_LEN]; }; @@ -84,10 +88,16 @@ typedef union queue_entry_u { queue_entry_t *get_qentry(uint32_t queue_id); int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); +int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr); + odp_buffer_hdr_t *queue_deq(queue_entry_t *queue); +odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue); int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); +int queue_enq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); + int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); +int queue_deq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num); void queue_lock(queue_entry_t *queue); void queue_unlock(queue_entry_t *queue); diff --git a/platform/linux-generic/odp_buffer_pool.c b/platform/linux-generic/odp_buffer_pool.c index a48781a..9157994 100644 --- a/platform/linux-generic/odp_buffer_pool.c +++ b/platform/linux-generic/odp_buffer_pool.c @@ -102,7 +102,8 @@ int odp_buffer_pool_init_global(void) pool_tbl = odp_shm_reserve("odp_buffer_pools", sizeof(pool_table_t), - sizeof(pool_entry_t)); + sizeof(pool_entry_t), + ODP_SHM_THREAD); if (pool_tbl == NULL) return -1; diff --git a/platform/linux-generic/odp_packet_io.c b/platform/linux-generic/odp_packet_io.c index 33ade10..6672d6b 100644 --- a/platform/linux-generic/odp_packet_io.c +++ b/platform/linux-generic/odp_packet_io.c @@ -55,7 +55,8 @@ int odp_pktio_init_global(void) pktio_tbl = odp_shm_reserve("odp_pktio_entries", sizeof(pktio_table_t), - sizeof(pktio_entry_t)); + sizeof(pktio_entry_t), + ODP_SHM_THREAD); if (pktio_tbl == NULL) return -1; @@ -129,6 +130,8 @@ static void init_pktio_entry(pktio_entry_t *entry, odp_pktio_params_t *params) memset(&entry->s.pkt_nm, 0, sizeof(entry->s.pkt_nm)); break; #endif + case ODP_PKTIO_TYPE_IPC: + break; default: ODP_ERR("Packet I/O type not supported. Please recompile\n"); break; @@ -194,6 +197,8 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_buffer_pool_t pool, ODP_DBG("Allocating netmap pktio\n"); break; #endif + case ODP_PKTIO_TYPE_IPC: + break; default: ODP_ERR("Invalid pktio type: %02x\n", params->type); return ODP_PKTIO_INVALID; @@ -239,6 +244,9 @@ odp_pktio_t odp_pktio_open(const char *dev, odp_buffer_pool_t pool, } break; #endif + case ODP_PKTIO_TYPE_IPC: + pktio_entry->s.pool = pool; + break; default: free_pktio_entry(id); id = ODP_PKTIO_INVALID; @@ -381,11 +389,23 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t queue) pktio_entry_t *pktio_entry = get_entry(id); queue_entry_t *qentry = queue_to_qentry(queue); - if (pktio_entry == NULL || qentry == NULL) + if (pktio_entry == NULL || qentry == NULL) { + ODP_ERR("%s() return -q reason %p -- %p\n", + __func__, + pktio_entry, qentry); return -1; + } - if (qentry->s.type != ODP_QUEUE_TYPE_PKTIN) + switch (qentry->s.type) + { + case ODP_QUEUE_TYPE_PKTIN: + case ODP_QUEUE_TYPE_IPC: + case ODP_QUEUE_TYPE_IPC_LOOKUP: + break; + default: + ODP_ERR("%s() type is %d\n", __func__, qentry->s.type); return -1; + } lock_entry(pktio_entry); pktio_entry->s.inq_default = queue; @@ -396,6 +416,12 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t queue) qentry->s.status = QUEUE_STATUS_SCHED; queue_unlock(qentry); + if (qentry->s.type == ODP_QUEUE_TYPE_IPC) + return 0; + if (qentry->s.type == ODP_QUEUE_TYPE_IPC_LOOKUP) + return 0; + + odp_schedule_queue(queue, qentry->s.param.sched.prio); return 0; diff --git a/platform/linux-generic/odp_queue.c b/platform/linux-generic/odp_queue.c index c637bdf..d08e72e 100644 --- a/platform/linux-generic/odp_queue.c +++ b/platform/linux-generic/odp_queue.c @@ -21,6 +21,10 @@ #include <odp_hints.h> #include <odp_sync.h> +#include <helper/odp_ring.h> +#include <sys/types.h> +#include <unistd.h> + #ifdef USE_TICKETLOCK #include <odp_ticketlock.h> #define LOCK(a) odp_ticketlock_lock(a) @@ -34,7 +38,7 @@ #endif #include <string.h> - +#include <stdlib.h> typedef struct queue_table_t { queue_entry_t queue[ODP_CONFIG_QUEUES]; @@ -77,6 +81,41 @@ static void queue_init(queue_entry_t *queue, const char *name, queue->s.enqueue_multi = pktout_enq_multi; queue->s.dequeue_multi = pktout_deq_multi; break; + case ODP_QUEUE_TYPE_IPC: + queue->s.r = odp_ring_lookup(name); + queue->s.r_p = (odp_buffer_t **)malloc(QUEUE_IPC_ENTRIES * + sizeof(odp_buffer_t)); + if (!queue->s.r) + { + queue->s.r = odp_ring_create(name, QUEUE_IPC_ENTRIES, ODP_RING_SHM_PROC); + if (queue->s.r == NULL) { + ODP_ERR("ring create failed\n"); + } + } + queue->s.enqueue = queue_enq_ipc; + queue->s.dequeue = queue_deq_ipc; + queue->s.enqueue_multi = queue_enq_multi_ipc; + queue->s.dequeue_multi = queue_deq_multi_ipc; + break; + case ODP_QUEUE_TYPE_IPC_LOOKUP: + queue->s.r_p = (odp_buffer_t **)malloc(QUEUE_IPC_ENTRIES * + sizeof(odp_buffer_t)); + if (odp_shm_lookup_ipc(name) == 1) + { + size_t ring_size = QUEUE_IPC_ENTRIES * sizeof(void *) + + sizeof(odp_ring_t); + queue->s.r = odp_shm_reserve(name, ring_size, + ODP_CACHE_LINE_SIZE, + ODP_SHM_PROC_NOCREAT); + if (queue->s.r == NULL) { + ODP_ERR("LOOKUP ring create failed\n"); + } + } + queue->s.enqueue = queue_enq_ipc; + queue->s.dequeue = queue_deq_ipc; + queue->s.enqueue_multi = queue_enq_multi_ipc; + queue->s.dequeue_multi = queue_deq_multi_ipc; + break; default: queue->s.enqueue = queue_enq; queue->s.dequeue = queue_deq; @@ -99,7 +138,8 @@ int odp_queue_init_global(void) queue_tbl = odp_shm_reserve("odp_queues", sizeof(queue_table_t), - sizeof(queue_entry_t)); + sizeof(queue_entry_t), + ODP_SHM_THREAD); if (queue_tbl == NULL) return -1; @@ -113,6 +153,11 @@ int odp_queue_init_global(void) queue->s.handle = queue_from_id(i); } + /* for linux-generic IPC queue implemented totaly in + * software using odp_ring. + */ + odp_ring_tailq_init(); + ODP_DBG("done\n"); ODP_DBG("Queue init global\n"); ODP_DBG(" struct queue_entry_s size %zu\n", @@ -243,6 +288,27 @@ odp_queue_t odp_queue_lookup(const char *name) UNLOCK(&queue->s.lock); } + /* do look up for shared memory object if exist return that queue*/ + odp_ring_t *r; + + r = odp_ring_lookup(name); + if (r == NULL) { + if ( odp_shm_lookup_ipc(name) == 1) { + /* Create local IPC queue connected to shm object */ + odp_queue_t q = odp_queue_create(name, + ODP_QUEUE_TYPE_IPC_LOOKUP, NULL); + if (q != ODP_QUEUE_INVALID) { + return q; + } + } + } else { + /* odp ring is in odp_ring_list. That means current process already created + * link with such name. That might be ipc queue or ring itself. For now + * print error here + */ + ODP_ERR("odp ring with name: \"%s\" already initialized\n", name); + } + return ODP_QUEUE_INVALID; } @@ -276,6 +342,38 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) return 0; } +int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr) +{ + int ret; + odp_ring_t *r = queue->s.r; + odp_buffer_bits_t handle; + uint32_t index = buf_hdr->handle.index; + uint32_t pool_id = buf_hdr->handle.pool; + odp_buffer_t buf; + void **rbuf_p; + + /* get buffer from buf_hdr */ + handle.index = index; + handle.pool = pool_id; + + buf = handle.u32; + + rbuf_p = (void*)&buf; + /* use odp_ring locks instead of per process queue lock + * LOCK(&queue->s.lock); + */ + /* queue buffer to the ring. Note: we can't use pointer to buf_hdr + * here due to poiter will be referenced in different porocess + */ + ret = odp_ring_mp_enqueue_bulk(r, rbuf_p, 1); + if (ret != 0) + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); + /* + * UNLOCK(&queue->s.lock); + */ + return 0; +} + int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { @@ -311,6 +409,43 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) return 0; } +int queue_enq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) +{ + int i; + int ret = 0; + odp_ring_t *r = queue->s.r; + odp_buffer_bits_t handle; + odp_buffer_t buf; + void **rbuf_p; + + /* use odp_ring locks instead of per process queue lock + * LOCK(&queue->s.lock); + */ + + /* odp_buffer_t buffers can be in not continius memory, + * so queue them to IPC ring one by one. + */ + for (i = 0; i < num; i++) { + handle.index = buf_hdr[i]->handle.index; + handle.pool = buf_hdr[i]->handle.pool; + + buf = handle.u32; + + rbuf_p = (void*)&buf; + + /* queue buffer to the ring. Note: we can't use pointer to buf_hdr + * here due to poiter will be referenced in different porocess + */ + ret += odp_ring_mp_enqueue_bulk(r, rbuf_p, 1); + if (ret != 0) + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n"); + } + /* + * UNLOCK(&queue->s.lock); + */ + + return ret; +} int odp_queue_enq_multi(odp_queue_t handle, odp_buffer_t buf[], int num) { @@ -369,6 +504,72 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue) return buf_hdr; } +odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue) +{ + odp_buffer_hdr_t *buf_hdr = NULL; + odp_ring_t *r = queue->s.r; + int ret; + odp_buffer_t buf; + + /* using odp_ring lock + * LOCK(&queue->s.lock); + */ + ret = odp_ring_mc_dequeue_bulk(r, (void **)queue->s.r_p, 1); + if (ret == 0) { + memcpy(&buf, (void *)queue->s.r_p, + sizeof(odp_buffer_t)); + buf_hdr = odp_buf_to_hdr(buf); + } + /* + * UNLOCK(&queue->s.lock); + */ + + return buf_hdr; +} + +int queue_deq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) +{ + int i = 0; + odp_ring_t *r = queue->s.r; + int ret; + odp_buffer_t buf; + + /* use odp ring lock + * LOCK(&queue->s.lock); + */ + + if (queue->s.head == NULL) { + /* Already empty queue */ + } else { + odp_buffer_hdr_t *hdr = queue->s.head; + + ret = odp_ring_mc_dequeue_bulk(r, (void **)queue->s.r_p, num); + if (ret == 0) { + for (; i < num && hdr; i++) { + memcpy(&buf, (void *)queue->s.r_p[i], + sizeof(odp_buffer_t)); + + buf_hdr[i] = odp_buf_to_hdr(buf); + hdr = hdr->next; + buf_hdr[i]->next = NULL; + } + + } + + queue->s.head = hdr; + + if (hdr == NULL) { + /* Queue is now empty */ + queue->s.tail = NULL; + } + } + + /* use odp_ring lock + * UNLOCK(&queue->s.lock); + */ + + return i; +} int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num) { diff --git a/platform/linux-generic/odp_ring.c b/platform/linux-generic/odp_ring.c index 25ff66a..40df789 100644 --- a/platform/linux-generic/odp_ring.c +++ b/platform/linux-generic/odp_ring.c @@ -82,6 +82,9 @@ #include <odp_rwlock.h> #include <helper/odp_ring.h> +#include <sys/types.h> +#include <unistd.h> + static TAILQ_HEAD(, odp_ring) odp_ring_list; /* @@ -158,6 +161,12 @@ odp_ring_create(const char *name, unsigned count, unsigned flags) char ring_name[ODP_RING_NAMESIZE]; odp_ring_t *r; size_t ring_size; + odp_shm_e shm_flag; + + if (flags & ODP_RING_SHM_PROC) + shm_flag = ODP_SHM_PROC; + else + shm_flag = ODP_SHM_THREAD; /* count must be a power of 2 */ if (!ODP_VAL_IS_POWER_2(count) || (count > ODP_RING_SZ_MASK)) { @@ -171,7 +180,8 @@ odp_ring_create(const char *name, unsigned count, unsigned flags) odp_rwlock_write_lock(&qlock); /* reserve a memory zone for this ring.*/ - r = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE); + r = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE, + shm_flag); if (r != NULL) { /* init the ring structure */ @@ -545,12 +555,14 @@ void odp_ring_list_dump(void) /* search a ring from its name */ odp_ring_t *odp_ring_lookup(const char *name) { - odp_ring_t *r = odp_shm_lookup(name); + odp_ring_t *r; odp_rwlock_read_lock(&qlock); TAILQ_FOREACH(r, &odp_ring_list, next) { - if (strncmp(name, r->name, ODP_RING_NAMESIZE) == 0) - break; + if (strncmp(name, r->name, ODP_RING_NAMESIZE) == 0) { + odp_rwlock_read_unlock(&qlock); + return r; + } } odp_rwlock_read_unlock(&qlock); diff --git a/platform/linux-generic/odp_schedule.c b/platform/linux-generic/odp_schedule.c index 9e399f1..9e76692 100644 --- a/platform/linux-generic/odp_schedule.c +++ b/platform/linux-generic/odp_schedule.c @@ -89,7 +89,8 @@ int odp_schedule_init_global(void) sched = odp_shm_reserve("odp_scheduler", sizeof(sched_t), - ODP_CACHE_LINE_SIZE); + ODP_CACHE_LINE_SIZE, + ODP_SHM_THREAD); if (sched == NULL) { ODP_ERR("Schedule init: Shm reserve failed.\n"); @@ -98,7 +99,8 @@ int odp_schedule_init_global(void) pool_base = odp_shm_reserve("odp_sched_pool", - SCHED_POOL_SIZE, ODP_CACHE_LINE_SIZE); + SCHED_POOL_SIZE, ODP_CACHE_LINE_SIZE, + ODP_SHM_THREAD); pool = odp_buffer_pool_create("odp_sched_pool", pool_base, SCHED_POOL_SIZE, sizeof(queue_desc_t), diff --git a/platform/linux-generic/odp_shared_memory.c b/platform/linux-generic/odp_shared_memory.c index 784f42b..ae53bb6 100644 --- a/platform/linux-generic/odp_shared_memory.c +++ b/platform/linux-generic/odp_shared_memory.c @@ -14,10 +14,14 @@ #include <sys/mman.h> #include <asm/mman.h> #include <fcntl.h> +#include <unistd.h> +#include <sys/types.h> #include <stdio.h> #include <string.h> +#include <helper/odp_ring.h> +#include <stdlib.h> #define ODP_SHM_NUM_BLOCKS 32 @@ -59,9 +63,8 @@ int odp_shm_init_global(void) ODP_DBG("NOTE: mmap does not support huge pages\n"); #endif - addr = mmap(NULL, sizeof(odp_shm_table_t), - PROT_READ | PROT_WRITE, SHM_FLAGS, -1, 0); - + /* malloc instead of mmap to bind table to process. */ + addr = malloc(sizeof(odp_shm_table_t)); if (addr == MAP_FAILED) return -1; @@ -95,9 +98,12 @@ static int find_block(const char *name) } -void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align) +void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align, + odp_shm_e flag) { - int i; + int i, ret, shm_open_flags; + int shm = -1; + int mmap_flags = MAP_SHARED; odp_shm_block_t *block; void *addr; #ifdef MAP_HUGETLB @@ -107,8 +113,20 @@ void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align) page_sz = odp_sys_page_size(); #endif + printf("pid %d: %s() %s size %lld, flag %d\n", + getpid(), __func__, name, (unsigned long long)size, flag); + odp_spinlock_lock(&odp_shm_tbl->lock); + /* if object was already created return it's address */ + if (flag == ODP_SHM_PROC_NOCREAT) { + for (i = 0; i < ODP_SHM_NUM_BLOCKS; i++) { + if (strcmp(name, odp_shm_tbl->block[i].name) == 0) { + return odp_shm_tbl->block[i].addr; + } + } + } + if (find_block(name) >= 0) { /* Found a block with the same name */ odp_spinlock_unlock(&odp_shm_tbl->lock); @@ -123,8 +141,8 @@ void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align) } if (i > ODP_SHM_NUM_BLOCKS - 1) { - /* Table full */ odp_spinlock_unlock(&odp_shm_tbl->lock); + ODP_ERR("ODP_SHM_NUM_BLOCKS table is full"); return NULL; } @@ -133,19 +151,42 @@ void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align) addr = MAP_FAILED; block->huge = 0; + if (flag != ODP_SHM_THREAD) { + shm_open_flags = O_RDWR; + if (flag == ODP_SHM_PROC) + shm_open_flags |= O_CREAT; + + shm = shm_open(name, shm_open_flags, S_IRUSR | S_IWUSR); + if (shm == -1) { + odp_spinlock_unlock(&odp_shm_tbl->lock); + ODP_ERR("shm_open failed"); + return NULL; + } + + ret = ftruncate(shm, size + align); + if (ret == -1) { + odp_spinlock_unlock(&odp_shm_tbl->lock); + if (flag != ODP_SHM_PROC_NOCREAT) + shm_unlink(name); + ODP_ERR("ftruncate failed"); + return NULL; + } + } else { + mmap_flags |= MAP_ANONYMOUS; + } + #ifdef MAP_HUGETLB /* Try first huge pages */ if (huge_sz && (size + align) > page_sz) { addr = mmap(NULL, size + align, PROT_READ | PROT_WRITE, - SHM_FLAGS | MAP_HUGETLB, -1, 0); + mmap_flags | MAP_HUGETLB, shm, 0); } #endif /* Use normal pages for small or failed huge page allocations */ if (addr == MAP_FAILED) { addr = mmap(NULL, size + align, PROT_READ | PROT_WRITE, - SHM_FLAGS, -1, 0); - + mmap_flags, shm, 0); } else { block->huge = 1; } @@ -153,6 +194,9 @@ void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align) if (addr == MAP_FAILED) { /* Alloc failed */ odp_spinlock_unlock(&odp_shm_tbl->lock); + if (flag != ODP_SHM_PROC_NOCREAT) + shm_unlink(name); + ODP_ERR("MAP_FAILED\n"); return NULL; } @@ -192,6 +236,17 @@ void *odp_shm_lookup(const char *name) return addr; } +int odp_shm_lookup_ipc(const char *name) +{ + int shm; + + shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR); + if (shm == -1) + return 0; + + close(shm); + return 1; +} void odp_shm_print_all(void) { diff --git a/test/api_test/odp_shm_test.c b/test/api_test/odp_shm_test.c index 318d662..fc448a4 100644 --- a/test/api_test/odp_shm_test.c +++ b/test/api_test/odp_shm_test.c @@ -47,7 +47,8 @@ int main(int argc ODP_UNUSED, char *argv[] ODP_UNUSED) odp_print_system_info(); test_shared_data = odp_shm_reserve("test_shared_data", - sizeof(test_shared_data_t), 128); + sizeof(test_shared_data_t), 128, + ODP_SHM_THREAD); memset(test_shared_data, 0, sizeof(test_shared_data_t)); printf("test shared data at %p\n\n", test_shared_data); diff --git a/test/api_test/odp_timer_ping.c b/test/api_test/odp_timer_ping.c index c1cc255..88f830f 100644 --- a/test/api_test/odp_timer_ping.c +++ b/test/api_test/odp_timer_ping.c @@ -328,7 +328,8 @@ int main(int argc ODP_UNUSED, char *argv[] ODP_UNUSED) * Create message pool */ pool_base = odp_shm_reserve("msg_pool", - MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE); + MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE, + ODP_SHM_THREAD); pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE, BUF_SIZE,
Implement odp implementation for linux-generic using standard odp queue API. Signed-off-by: Maxim Uvarov <maxim.uvarov@linaro.org> --- New version of IPC queues. TODO before commit: merge this patch with Petri's change to odp_shm_reserve(). Maxim. .gitignore | 5 + configure.ac | 1 + example/Makefile.am | 2 +- example/generator/odp_generator.c | 6 +- example/ipc/Makefile.am | 6 + example/ipc/README | 46 ++ example/ipc/odp_ipc.c | 685 +++++++++++++++++++++ example/l2fwd/odp_l2fwd.c | 6 +- example/odp_example/odp_example.c | 3 +- example/packet/odp_pktio.c | 6 +- example/timer/odp_timer_test.c | 3 +- include/helper/odp_ring.h | 2 + include/odp_queue.h | 2 + include/odp_shared_memory.h | 12 +- .../linux-generic/include/api/odp_pktio_types.h | 1 + .../linux-generic/include/odp_packet_io_internal.h | 1 + .../linux-generic/include/odp_queue_internal.h | 12 +- platform/linux-generic/odp_buffer_pool.c | 3 +- platform/linux-generic/odp_packet_io.c | 32 +- platform/linux-generic/odp_queue.c | 205 +++++- platform/linux-generic/odp_ring.c | 20 +- platform/linux-generic/odp_schedule.c | 6 +- platform/linux-generic/odp_shared_memory.c | 73 ++- test/api_test/odp_shm_test.c | 3 +- test/api_test/odp_timer_ping.c | 3 +- 25 files changed, 1110 insertions(+), 34 deletions(-) create mode 100644 example/ipc/Makefile.am create mode 100644 example/ipc/README create mode 100644 example/ipc/odp_ipc.c