diff mbox series

[v1,1/3] linux-gen: sched: optimize packet input polling

Message ID 1519920009-23406-2-git-send-email-odpbot@yandex.ru
State Superseded
Headers show
Series [v1,1/3] linux-gen: sched: optimize packet input polling | expand

Commit Message

Github ODP bot March 1, 2018, 4 p.m. UTC
From: Petri Savolainen <petri.savolainen@linaro.org>


Optimize scheduler throughput with packet IO interfaces.
Special pktio poll commands are removed and event queue is
used instead to trigger packet input polling. Packet input is
polled when those queues are empty. Thus, these queues
connected to packet input are not removed from scheduling
when empty.

Signed-off-by: Petri Savolainen <petri.savolainen@linaro.org>

---
/** Email created from pull request 504 (psavol:master-sched-optim-2)
 ** https://github.com/Linaro/odp/pull/504
 ** Patch: https://github.com/Linaro/odp/pull/504.patch
 ** Base sha: 284f52d72ec19df3774c7409780f1f9eea33b8e6
 ** Merge commit sha: 081cdd2f01d5a4e5b5b4d254f689e55d60a489cd
 **/
 platform/linux-generic/include/odp_schedule_if.h |   4 +-
 platform/linux-generic/odp_queue_basic.c         |  24 +-
 platform/linux-generic/odp_schedule_basic.c      | 331 +++++++----------------
 platform/linux-generic/odp_schedule_iquery.c     |   4 +-
 platform/linux-generic/odp_schedule_sp.c         |   2 +-
 5 files changed, 129 insertions(+), 236 deletions(-)
diff mbox series

Patch

diff --git a/platform/linux-generic/include/odp_schedule_if.h b/platform/linux-generic/include/odp_schedule_if.h
index 66e050438..dd2097bfb 100644
--- a/platform/linux-generic/include/odp_schedule_if.h
+++ b/platform/linux-generic/include/odp_schedule_if.h
@@ -82,7 +82,9 @@  int sched_cb_pktin_poll_one(int pktio_index, int rx_queue, odp_event_t evts[]);
 void sched_cb_pktio_stop_finalize(int pktio_index);
 odp_queue_t sched_cb_queue_handle(uint32_t queue_index);
 void sched_cb_queue_destroy_finalize(uint32_t queue_index);
-int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num);
+void sched_cb_queue_set_status(uint32_t queue_index, int status);
+int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num,
+			     int update_status);
 int sched_cb_queue_empty(uint32_t queue_index);
 
 /* API functions */
diff --git a/platform/linux-generic/odp_queue_basic.c b/platform/linux-generic/odp_queue_basic.c
index e4f6fd820..d9e5fdcea 100644
--- a/platform/linux-generic/odp_queue_basic.c
+++ b/platform/linux-generic/odp_queue_basic.c
@@ -299,6 +299,17 @@  void sched_cb_queue_destroy_finalize(uint32_t queue_index)
 	UNLOCK(queue);
 }
 
+void sched_cb_queue_set_status(uint32_t queue_index, int status)
+{
+	queue_entry_t *queue = get_qentry(queue_index);
+
+	LOCK(queue);
+
+	queue->s.status = status;
+
+	UNLOCK(queue);
+}
+
 static int queue_destroy(odp_queue_t handle)
 {
 	queue_entry_t *queue;
@@ -493,7 +504,7 @@  static int queue_enq(odp_queue_t handle, odp_event_t ev)
 }
 
 static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
-			    int num)
+			    int num, int update_status)
 {
 	int status_sync = sched_fn->status_sync;
 	int num_deq;
@@ -515,7 +526,7 @@  static inline int deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
 
 	if (num_deq == 0) {
 		/* Already empty queue */
-		if (queue->s.status == QUEUE_STATUS_SCHED) {
+		if (update_status && queue->s.status == QUEUE_STATUS_SCHED) {
 			queue->s.status = QUEUE_STATUS_NOTSCHED;
 
 			if (status_sync)
@@ -542,7 +553,7 @@  static int queue_int_deq_multi(queue_t q_int, odp_buffer_hdr_t *buf_hdr[],
 {
 	queue_entry_t *queue = qentry_from_int(q_int);
 
-	return deq_multi(queue, buf_hdr, num);
+	return deq_multi(queue, buf_hdr, num, 0);
 }
 
 static odp_buffer_hdr_t *queue_int_deq(queue_t q_int)
@@ -551,7 +562,7 @@  static odp_buffer_hdr_t *queue_int_deq(queue_t q_int)
 	odp_buffer_hdr_t *buf_hdr = NULL;
 	int ret;
 
-	ret = deq_multi(queue, &buf_hdr, 1);
+	ret = deq_multi(queue, &buf_hdr, 1, 0);
 
 	if (ret == 1)
 		return buf_hdr;
@@ -661,11 +672,12 @@  static int queue_info(odp_queue_t handle, odp_queue_info_t *info)
 	return 0;
 }
 
-int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num)
+int sched_cb_queue_deq_multi(uint32_t queue_index, odp_event_t ev[], int num,
+			     int update_status)
 {
 	queue_entry_t *qe = get_qentry(queue_index);
 
-	return deq_multi(qe, (odp_buffer_hdr_t **)ev, num);
+	return deq_multi(qe, (odp_buffer_hdr_t **)ev, num, update_status);
 }
 
 int sched_cb_queue_empty(uint32_t queue_index)
diff --git a/platform/linux-generic/odp_schedule_basic.c b/platform/linux-generic/odp_schedule_basic.c
index e5e1fe60e..b3847ab91 100644
--- a/platform/linux-generic/odp_schedule_basic.c
+++ b/platform/linux-generic/odp_schedule_basic.c
@@ -50,39 +50,15 @@  ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
 /* Size of poll weight table */
 #define WEIGHT_TBL_SIZE ((QUEUES_PER_PRIO - 1) * PREFER_RATIO)
 
-/* Packet input poll cmd queues */
-#define PKTIO_CMD_QUEUES  4
-
-/* Mask for wrapping command queues */
-#define PKTIO_CMD_QUEUE_MASK (PKTIO_CMD_QUEUES - 1)
-
-/* Maximum number of packet input queues per command */
-#define MAX_PKTIN 16
-
 /* Maximum number of packet IO interfaces */
 #define NUM_PKTIO ODP_CONFIG_PKTIO_ENTRIES
 
-/* Maximum number of pktio poll commands */
-#define NUM_PKTIO_CMD (MAX_PKTIN * NUM_PKTIO)
+/* Maximum pktin index. Needs to fit into 8 bits. */
+#define MAX_PKTIN_INDEX 255
 
 /* Not a valid index */
 #define NULL_INDEX ((uint32_t)-1)
 
-/* Not a valid poll command */
-#define PKTIO_CMD_INVALID NULL_INDEX
-
-/* Pktio command is free */
-#define PKTIO_CMD_FREE    PKTIO_CMD_INVALID
-
-/* Packet IO poll queue ring size. In worst case, all pktios have all pktins
- * enabled and one poll command is created per pktin queue. The ring size must
- * be larger than or equal to NUM_PKTIO_CMD / PKTIO_CMD_QUEUES, so that it can
- * hold all poll commands in the worst case. */
-#define PKTIO_RING_SIZE (NUM_PKTIO_CMD / PKTIO_CMD_QUEUES)
-
-/* Mask for wrapping around pktio poll command index */
-#define PKTIO_RING_MASK (PKTIO_RING_SIZE - 1)
-
 /* Priority queue ring size. In worst case, all event queues are scheduled
  * queues and have the same priority. The ring size must be larger than or
  * equal to ODP_CONFIG_QUEUES / QUEUES_PER_PRIO, so that it can hold all
@@ -90,7 +66,7 @@  ODP_STATIC_ASSERT((ODP_SCHED_PRIO_NORMAL > 0) &&
 #define PRIO_QUEUE_RING_SIZE  (ODP_CONFIG_QUEUES / QUEUES_PER_PRIO)
 
 /* Mask for wrapping around priority queue index */
-#define PRIO_QUEUE_MASK  (PRIO_QUEUE_RING_SIZE - 1)
+#define RING_MASK  (PRIO_QUEUE_RING_SIZE - 1)
 
 /* Priority queue empty, not a valid queue index. */
 #define PRIO_QUEUE_EMPTY NULL_INDEX
@@ -103,15 +79,6 @@  ODP_STATIC_ASSERT(CHECK_IS_POWER2(ODP_CONFIG_QUEUES),
 ODP_STATIC_ASSERT(CHECK_IS_POWER2(PRIO_QUEUE_RING_SIZE),
 		  "Ring_size_is_not_power_of_two");
 
-/* Ring size must be power of two, so that PKTIO_RING_MASK can be used. */
-ODP_STATIC_ASSERT(CHECK_IS_POWER2(PKTIO_RING_SIZE),
-		  "pktio_ring_size_is_not_power_of_two");
-
-/* Number of commands queues must be power of two, so that PKTIO_CMD_QUEUE_MASK
- * can be used. */
-ODP_STATIC_ASSERT(CHECK_IS_POWER2(PKTIO_CMD_QUEUES),
-		  "pktio_cmd_queues_is_not_power_of_two");
-
 /* Mask of queues per priority */
 typedef uint8_t pri_mask_t;
 
@@ -150,7 +117,6 @@  typedef struct {
 	int index;
 	int pause;
 	uint16_t round;
-	uint16_t pktin_polls;
 	uint32_t queue_index;
 	odp_queue_t queue;
 	odp_event_t ev_stash[MAX_DEQ];
@@ -183,24 +149,6 @@  typedef struct ODP_ALIGNED_CACHE {
 
 } prio_queue_t;
 
-/* Packet IO queue */
-typedef struct ODP_ALIGNED_CACHE {
-	/* Ring header */
-	ring_t ring;
-
-	/* Ring data: pktio poll command indexes */
-	uint32_t cmd_index[PKTIO_RING_SIZE];
-
-} pktio_queue_t;
-
-/* Packet IO poll command */
-typedef struct {
-	int pktio_index;
-	int num_pktin;
-	int pktin[MAX_PKTIN];
-	uint32_t cmd_index;
-} pktio_cmd_t;
-
 /* Order context of a queue */
 typedef struct ODP_ALIGNED_CACHE {
 	/* Current ordered context id */
@@ -220,16 +168,6 @@  typedef struct {
 
 	prio_queue_t   prio_q[NUM_SCHED_GRPS][NUM_PRIO][QUEUES_PER_PRIO];
 
-	odp_spinlock_t poll_cmd_lock;
-	/* Number of commands in a command queue */
-	uint16_t       num_pktio_cmd[PKTIO_CMD_QUEUES];
-
-	/* Packet IO command queues */
-	pktio_queue_t  pktio_q[PKTIO_CMD_QUEUES];
-
-	/* Packet IO poll commands */
-	pktio_cmd_t    pktio_cmd[NUM_PKTIO_CMD];
-
 	odp_shm_t      shm;
 	uint32_t       pri_count[NUM_PRIO][QUEUES_PER_PRIO];
 
@@ -244,22 +182,34 @@  typedef struct {
 	} sched_grp[NUM_SCHED_GRPS];
 
 	struct {
-		int         grp;
-		int         prio;
-		int         queue_per_prio;
-		int         sync;
-		uint32_t    order_lock_count;
+		uint8_t grp;
+		uint8_t prio;
+		uint8_t queue_per_prio;
+		uint8_t sync;
+		uint8_t order_lock_count;
+		uint8_t poll_pktin;
+		uint8_t pktio_index;
+		uint8_t pktin_index;
 	} queue[ODP_CONFIG_QUEUES];
 
 	struct {
-		/* Number of active commands for a pktio interface */
-		int num_cmd;
+		int num_pktin;
 	} pktio[NUM_PKTIO];
+	odp_spinlock_t pktio_lock;
 
 	order_context_t order[ODP_CONFIG_QUEUES];
 
 } sched_global_t;
 
+/* Check that queue[] variables are large enough */
+ODP_STATIC_ASSERT(NUM_SCHED_GRPS  <= 256, "Group_does_not_fit_8_bits");
+ODP_STATIC_ASSERT(NUM_PRIO        <= 256, "Prio_does_not_fit_8_bits");
+ODP_STATIC_ASSERT(QUEUES_PER_PRIO <= 256,
+		  "Queues_per_prio_does_not_fit_8_bits");
+ODP_STATIC_ASSERT(CONFIG_QUEUE_MAX_ORD_LOCKS <= 256,
+		  "Ordered_lock_count_does_not_fit_8_bits");
+ODP_STATIC_ASSERT(NUM_PKTIO        <= 256, "Pktio_index_does_not_fit_8_bits");
+
 /* Global scheduler context */
 static sched_global_t *sched;
 
@@ -337,16 +287,9 @@  static int schedule_init_global(void)
 		}
 	}
 
-	odp_spinlock_init(&sched->poll_cmd_lock);
-	for (i = 0; i < PKTIO_CMD_QUEUES; i++) {
-		ring_init(&sched->pktio_q[i].ring);
-
-		for (j = 0; j < PKTIO_RING_SIZE; j++)
-			sched->pktio_q[i].cmd_index[j] = PKTIO_CMD_INVALID;
-	}
-
-	for (i = 0; i < NUM_PKTIO_CMD; i++)
-		sched->pktio_cmd[i].cmd_index = PKTIO_CMD_FREE;
+	odp_spinlock_init(&sched->pktio_lock);
+	for (i = 0; i < NUM_PKTIO; i++)
+		sched->pktio[i].num_pktin = 0;
 
 	odp_spinlock_init(&sched->grp_lock);
 	odp_atomic_init_u32(&sched->grp_epoch, 0);
@@ -384,14 +327,14 @@  static int schedule_term_global(void)
 				ring_t *ring = &sched->prio_q[grp][i][j].ring;
 				uint32_t qi;
 
-				while ((qi = ring_deq(ring, PRIO_QUEUE_MASK)) !=
+				while ((qi = ring_deq(ring, RING_MASK)) !=
 				       RING_EMPTY) {
 					odp_event_t events[1];
 					int num;
 
 					num = sched_cb_queue_deq_multi(qi,
 								       events,
-								       1);
+								       1, 1);
 
 					if (num < 0)
 						queue_destroy_finalize(qi);
@@ -519,6 +462,9 @@  static int schedule_init_queue(uint32_t queue_index,
 	sched->queue[queue_index].queue_per_prio = queue_per_prio(queue_index);
 	sched->queue[queue_index].sync = sched_param->sync;
 	sched->queue[queue_index].order_lock_count = sched_param->lock_count;
+	sched->queue[queue_index].poll_pktin  = 0;
+	sched->queue[queue_index].pktio_index = 0;
+	sched->queue[queue_index].pktin_index = 0;
 
 	odp_atomic_init_u64(&sched->order[queue_index].ctx, 0);
 	odp_atomic_init_u64(&sched->order[queue_index].next_ctx, 0);
@@ -554,88 +500,39 @@  static void schedule_destroy_queue(uint32_t queue_index)
 		ODP_ERR("queue reorder incomplete\n");
 }
 
-static int poll_cmd_queue_idx(int pktio_index, int pktin_idx)
-{
-	return PKTIO_CMD_QUEUE_MASK & (pktio_index ^ pktin_idx);
-}
-
-static inline pktio_cmd_t *alloc_pktio_cmd(void)
-{
-	int i;
-	pktio_cmd_t *cmd = NULL;
-
-	odp_spinlock_lock(&sched->poll_cmd_lock);
-
-	/* Find next free command */
-	for (i = 0; i < NUM_PKTIO_CMD; i++) {
-		if (sched->pktio_cmd[i].cmd_index == PKTIO_CMD_FREE) {
-			cmd = &sched->pktio_cmd[i];
-			cmd->cmd_index = i;
-			break;
-		}
-	}
-
-	odp_spinlock_unlock(&sched->poll_cmd_lock);
-
-	return cmd;
-}
-
-static inline void free_pktio_cmd(pktio_cmd_t *cmd)
+static int schedule_sched_queue(uint32_t queue_index)
 {
-	odp_spinlock_lock(&sched->poll_cmd_lock);
-
-	cmd->cmd_index = PKTIO_CMD_FREE;
+	int grp            = sched->queue[queue_index].grp;
+	int prio           = sched->queue[queue_index].prio;
+	int queue_per_prio = sched->queue[queue_index].queue_per_prio;
+	ring_t *ring       = &sched->prio_q[grp][prio][queue_per_prio].ring;
 
-	odp_spinlock_unlock(&sched->poll_cmd_lock);
+	ring_enq(ring, RING_MASK, queue_index);
+	return 0;
 }
 
 static void schedule_pktio_start(int pktio_index, int num_pktin,
-				 int pktin_idx[], odp_queue_t odpq[] ODP_UNUSED)
+				 int pktin_idx[], odp_queue_t queue[])
 {
-	int i, idx;
-	pktio_cmd_t *cmd;
-
-	if (num_pktin > MAX_PKTIN)
-		ODP_ABORT("Too many input queues for scheduler\n");
+	int i;
+	uint32_t qi;
 
-	sched->pktio[pktio_index].num_cmd = num_pktin;
+	sched->pktio[pktio_index].num_pktin = num_pktin;
 
-	/* Create a pktio poll command per queue */
 	for (i = 0; i < num_pktin; i++) {
+		qi = queue_to_index(queue[i]);
+		sched->queue[qi].poll_pktin  = 1;
+		sched->queue[qi].pktio_index = pktio_index;
+		sched->queue[qi].pktin_index = pktin_idx[i];
 
-		cmd = alloc_pktio_cmd();
-
-		if (cmd == NULL)
-			ODP_ABORT("Scheduler out of pktio commands\n");
-
-		idx = poll_cmd_queue_idx(pktio_index, pktin_idx[i]);
+		ODP_ASSERT(pktin_idx[i] <= MAX_PKTIN_INDEX);
 
-		odp_spinlock_lock(&sched->poll_cmd_lock);
-		sched->num_pktio_cmd[idx]++;
-		odp_spinlock_unlock(&sched->poll_cmd_lock);
-
-		cmd->pktio_index = pktio_index;
-		cmd->num_pktin   = 1;
-		cmd->pktin[0]    = pktin_idx[i];
-		ring_enq(&sched->pktio_q[idx].ring, PKTIO_RING_MASK,
-			 cmd->cmd_index);
+		/* Start polling */
+		sched_cb_queue_set_status(qi, QUEUE_STATUS_SCHED);
+		schedule_sched_queue(qi);
 	}
 }
 
-static int schedule_pktio_stop(int pktio_index, int first_pktin)
-{
-	int num;
-	int idx = poll_cmd_queue_idx(pktio_index, first_pktin);
-
-	odp_spinlock_lock(&sched->poll_cmd_lock);
-	sched->num_pktio_cmd[idx]--;
-	sched->pktio[pktio_index].num_cmd--;
-	num = sched->pktio[pktio_index].num_cmd;
-	odp_spinlock_unlock(&sched->poll_cmd_lock);
-
-	return num;
-}
-
 static void schedule_release_atomic(void)
 {
 	uint32_t qi = sched_local.queue_index;
@@ -647,7 +544,7 @@  static void schedule_release_atomic(void)
 		ring_t *ring = &sched->prio_q[grp][prio][queue_per_prio].ring;
 
 		/* Release current atomic queue */
-		ring_enq(ring, PRIO_QUEUE_MASK, qi);
+		ring_enq(ring, RING_MASK, qi);
 		sched_local.queue_index = PRIO_QUEUE_EMPTY;
 	}
 }
@@ -797,6 +694,37 @@  static int schedule_ord_enq_multi(queue_t q_int, void *buf_hdr[],
 	return 1;
 }
 
+static inline int queue_is_pktin(uint32_t queue_index)
+{
+	return sched->queue[queue_index].poll_pktin;
+}
+
+static inline int poll_pktin(uint32_t qi)
+{
+	int pktio_index, pktin_index, num, num_pktin;
+
+	pktio_index = sched->queue[qi].pktio_index;
+	pktin_index = sched->queue[qi].pktin_index;
+
+	num = sched_cb_pktin_poll(pktio_index, 1, &pktin_index);
+
+	/* Pktio stopped or closed. Call stop_finalize when we have stopped
+	 * polling all pktin queues of the pktio. */
+	if (odp_unlikely(num < 0)) {
+		odp_spinlock_lock(&sched->pktio_lock);
+		sched->pktio[pktio_index].num_pktin--;
+		num_pktin = sched->pktio[pktio_index].num_pktin;
+		odp_spinlock_unlock(&sched->pktio_lock);
+
+		sched_cb_queue_set_status(qi, QUEUE_STATUS_NOTSCHED);
+
+		if (num_pktin == 0)
+			sched_cb_pktio_stop_finalize(pktio_index);
+	}
+
+	return num;
+}
+
 static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
 				  unsigned int max_num, int grp, int first)
 {
@@ -820,6 +748,7 @@  static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
 			int ordered;
 			odp_queue_t handle;
 			ring_t *ring;
+			int pktin;
 
 			if (id >= QUEUES_PER_PRIO)
 				id = 0;
@@ -834,7 +763,7 @@  static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
 
 			/* Get queue index from the priority queue */
 			ring = &sched->prio_q[grp][prio][id].ring;
-			qi   = ring_deq(ring, PRIO_QUEUE_MASK);
+			qi   = ring_deq(ring, RING_MASK);
 
 			/* Priority queue empty */
 			if (qi == RING_EMPTY) {
@@ -857,8 +786,10 @@  static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
 			if (ordered && max_num < MAX_DEQ)
 				max_deq = max_num;
 
+			pktin = queue_is_pktin(qi);
+
 			num = sched_cb_queue_deq_multi(qi, sched_local.ev_stash,
-						       max_deq);
+						       max_deq, !pktin);
 
 			if (num < 0) {
 				/* Destroyed queue. Continue scheduling the same
@@ -868,6 +799,20 @@  static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
 			}
 
 			if (num == 0) {
+				/* Poll packet input. Continue scheduling queue
+				 * connected to a packet input. Move to the next
+				 * priority to avoid starvation of other
+				 * priorities. Stop scheduling queue when pktio
+				 * has been stopped. */
+				if (pktin) {
+					int num_pkt = poll_pktin(qi);
+
+					if (odp_likely(num_pkt >= 0)) {
+						ring_enq(ring, RING_MASK, qi);
+						break;
+					}
+				}
+
 				/* Remove empty queue from scheduling. Continue
 				 * scheduling the same priority queue. */
 				continue;
@@ -890,14 +835,14 @@  static inline int do_schedule_grp(odp_queue_t *out_queue, odp_event_t out_ev[],
 				sched_local.ordered.src_queue = qi;
 
 				/* Continue scheduling ordered queues */
-				ring_enq(ring, PRIO_QUEUE_MASK, qi);
+				ring_enq(ring, RING_MASK, qi);
 
 			} else if (queue_is_atomic(qi)) {
 				/* Hold queue during atomic access */
 				sched_local.queue_index = qi;
 			} else {
 				/* Continue scheduling the queue */
-				ring_enq(ring, PRIO_QUEUE_MASK, qi);
+				ring_enq(ring, RING_MASK, qi);
 			}
 
 			/* Output the source queue handle */
@@ -919,7 +864,7 @@  static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 {
 	int i, num_grp;
 	int ret;
-	int id, first, grp_id;
+	int first, grp_id;
 	uint16_t round;
 	uint32_t epoch;
 
@@ -972,66 +917,11 @@  static inline int do_schedule(odp_queue_t *out_queue, odp_event_t out_ev[],
 			grp_id = 0;
 	}
 
-	/*
-	 * Poll packet input when there are no events
-	 *   * Each thread starts the search for a poll command from its
-	 *     preferred command queue. If the queue is empty, it moves to other
-	 *     queues.
-	 *   * Most of the times, the search stops on the first command found to
-	 *     optimize multi-threaded performance. A small portion of polls
-	 *     have to do full iteration to avoid packet input starvation when
-	 *     there are less threads than command queues.
-	 */
-	id = sched_local.thr & PKTIO_CMD_QUEUE_MASK;
-
-	for (i = 0; i < PKTIO_CMD_QUEUES; i++, id = ((id + 1) &
-	     PKTIO_CMD_QUEUE_MASK)) {
-		ring_t *ring;
-		uint32_t cmd_index;
-		pktio_cmd_t *cmd;
-
-		if (odp_unlikely(sched->num_pktio_cmd[id] == 0))
-			continue;
-
-		ring      = &sched->pktio_q[id].ring;
-		cmd_index = ring_deq(ring, PKTIO_RING_MASK);
-
-		if (odp_unlikely(cmd_index == RING_EMPTY))
-			continue;
-
-		cmd = &sched->pktio_cmd[cmd_index];
-
-		/* Poll packet input */
-		if (odp_unlikely(sched_cb_pktin_poll(cmd->pktio_index,
-						     cmd->num_pktin,
-						     cmd->pktin))){
-			/* Pktio stopped or closed. Remove poll command and call
-			 * stop_finalize when all commands of the pktio has
-			 * been removed. */
-			if (schedule_pktio_stop(cmd->pktio_index,
-						cmd->pktin[0]) == 0)
-				sched_cb_pktio_stop_finalize(cmd->pktio_index);
-
-			free_pktio_cmd(cmd);
-		} else {
-			/* Continue scheduling the pktio */
-			ring_enq(ring, PKTIO_RING_MASK, cmd_index);
-
-			/* Do not iterate through all pktin poll command queues
-			 * every time. */
-			if (odp_likely(sched_local.pktin_polls & 0xf))
-				break;
-		}
-	}
-
-	sched_local.pktin_polls++;
 	return 0;
 }
 
-
-static int schedule_loop(odp_queue_t *out_queue, uint64_t wait,
-			 odp_event_t out_ev[],
-			 unsigned int max_num)
+static inline int schedule_loop(odp_queue_t *out_queue, uint64_t wait,
+				odp_event_t out_ev[], unsigned int max_num)
 {
 	odp_time_t next, wtime;
 	int first = 1;
@@ -1387,17 +1277,6 @@  static void schedule_prefetch(int num ODP_UNUSED)
 {
 }
 
-static int schedule_sched_queue(uint32_t queue_index)
-{
-	int grp            = sched->queue[queue_index].grp;
-	int prio           = sched->queue[queue_index].prio;
-	int queue_per_prio = sched->queue[queue_index].queue_per_prio;
-	ring_t *ring       = &sched->prio_q[grp][prio][queue_per_prio].ring;
-
-	ring_enq(ring, PRIO_QUEUE_MASK, queue_index);
-	return 0;
-}
-
 static int schedule_num_grps(void)
 {
 	return NUM_SCHED_GRPS;
diff --git a/platform/linux-generic/odp_schedule_iquery.c b/platform/linux-generic/odp_schedule_iquery.c
index ea62c3645..1a82f48da 100644
--- a/platform/linux-generic/odp_schedule_iquery.c
+++ b/platform/linux-generic/odp_schedule_iquery.c
@@ -291,7 +291,7 @@  static int schedule_term_global(void)
 		odp_event_t events[1];
 
 		if (sched->availables[i])
-			count = sched_cb_queue_deq_multi(i, events, 1);
+			count = sched_cb_queue_deq_multi(i, events, 1, 1);
 
 		if (count < 0)
 			sched_cb_queue_destroy_finalize(i);
@@ -1527,7 +1527,7 @@  static inline int consume_queue(int prio, unsigned int queue_index)
 		max = 1;
 
 	count = sched_cb_queue_deq_multi(
-		queue_index, cache->stash, max);
+		queue_index, cache->stash, max, 1);
 
 	if (count < 0) {
 		DO_SCHED_UNLOCK();
diff --git a/platform/linux-generic/odp_schedule_sp.c b/platform/linux-generic/odp_schedule_sp.c
index 007d673f0..50390274b 100644
--- a/platform/linux-generic/odp_schedule_sp.c
+++ b/platform/linux-generic/odp_schedule_sp.c
@@ -559,7 +559,7 @@  static int schedule_multi(odp_queue_t *from, uint64_t wait,
 		}
 
 		qi  = cmd->s.index;
-		num = sched_cb_queue_deq_multi(qi, events, 1);
+		num = sched_cb_queue_deq_multi(qi, events, 1, 1);
 
 		if (num > 0) {
 			sched_local.cmd = cmd;