diff --git a/modules/indexer/code/indexer.go b/modules/indexer/code/indexer.go
index 4c7a1d4f17..9ae3abff60 100644
--- a/modules/indexer/code/indexer.go
+++ b/modules/indexer/code/indexer.go
@@ -133,11 +133,11 @@ func Init() {
 	// Create the Queue
 	switch setting.Indexer.RepoType {
 	case "bleve", "elasticsearch":
-		handler := func(data ...queue.Data) {
+		handler := func(data ...queue.Data) []queue.Data {
 			idx, err := indexer.get()
 			if idx == nil || err != nil {
 				log.Error("Codes indexer handler: unable to get indexer!")
-				return
+				return data
 			}
 
 			for _, datum := range data {
@@ -153,6 +153,7 @@ func Init() {
 					continue
 				}
 			}
+			return nil
 		}
 
 		indexerQueue = queue.CreateUniqueQueue("code_indexer", handler, &IndexerData{})
diff --git a/modules/indexer/issues/indexer.go b/modules/indexer/issues/indexer.go
index 8530210628..729981ec71 100644
--- a/modules/indexer/issues/indexer.go
+++ b/modules/indexer/issues/indexer.go
@@ -103,11 +103,11 @@ func InitIssueIndexer(syncReindex bool) {
 	// Create the Queue
 	switch setting.Indexer.IssueType {
 	case "bleve", "elasticsearch":
-		handler := func(data ...queue.Data) {
+		handler := func(data ...queue.Data) []queue.Data {
 			indexer := holder.get()
 			if indexer == nil {
 				log.Error("Issue indexer handler: unable to get indexer!")
-				return
+				return data
 			}
 
 			iData := make([]*IndexerData, 0, len(data))
@@ -127,6 +127,7 @@ func InitIssueIndexer(syncReindex bool) {
 			if err := indexer.Index(iData); err != nil {
 				log.Error("Error whilst indexing: %v Error: %v", iData, err)
 			}
+			return nil
 		}
 
 		issueIndexerQueue = queue.CreateQueue("issue_indexer", handler, &IndexerData{})
diff --git a/modules/indexer/stats/queue.go b/modules/indexer/stats/queue.go
index b458444697..f983fcd11d 100644
--- a/modules/indexer/stats/queue.go
+++ b/modules/indexer/stats/queue.go
@@ -17,13 +17,14 @@ import (
 var statsQueue queue.UniqueQueue
 
 // handle passed PR IDs and test the PRs
-func handle(data ...queue.Data) {
+func handle(data ...queue.Data) []queue.Data {
 	for _, datum := range data {
 		opts := datum.(int64)
 		if err := indexer.Index(opts); err != nil {
 			log.Error("stats queue indexer.Index(%d) failed: %v", opts, err)
 		}
 	}
+	return nil
 }
 
 func initStatsQueue() error {
diff --git a/modules/notification/ui/ui.go b/modules/notification/ui/ui.go
index ecedd70193..a27c5f699c 100644
--- a/modules/notification/ui/ui.go
+++ b/modules/notification/ui/ui.go
@@ -38,13 +38,14 @@ func NewNotifier() base.Notifier {
 	return ns
 }
 
-func (ns *notificationService) handle(data ...queue.Data) {
+func (ns *notificationService) handle(data ...queue.Data) []queue.Data {
 	for _, datum := range data {
 		opts := datum.(issueNotificationOpts)
 		if err := models.CreateOrUpdateIssueNotifications(opts.IssueID, opts.CommentID, opts.NotificationAuthorID, opts.ReceiverID); err != nil {
 			log.Error("Was unable to create issue notification: %v", err)
 		}
 	}
+	return nil
 }
 
 func (ns *notificationService) Run() {
diff --git a/modules/queue/bytefifo.go b/modules/queue/bytefifo.go
index 3a10c8e125..bb98d468fb 100644
--- a/modules/queue/bytefifo.go
+++ b/modules/queue/bytefifo.go
@@ -16,6 +16,8 @@ type ByteFIFO interface {
 	Pop(ctx context.Context) ([]byte, error)
 	// Close this fifo
 	Close() error
+	// PushBack pushes data back to the top of the fifo
+	PushBack(ctx context.Context, data []byte) error
 }
 
 // UniqueByteFIFO defines a FIFO that Uniques its contents
@@ -50,6 +52,11 @@ func (*DummyByteFIFO) Len(ctx context.Context) int64 {
 	return 0
 }
 
+// PushBack pushes data back to the top of the fifo
+func (*DummyByteFIFO) PushBack(ctx context.Context, data []byte) error {
+	return nil
+}
+
 var _ UniqueByteFIFO = &DummyUniqueByteFIFO{}
 
 // DummyUniqueByteFIFO represents a dummy unique fifo
diff --git a/modules/queue/manager.go b/modules/queue/manager.go
index e0384d15a3..56298a3e00 100644
--- a/modules/queue/manager.go
+++ b/modules/queue/manager.go
@@ -54,6 +54,18 @@ type Flushable interface {
 	IsEmpty() bool
 }
 
+// Pausable represents a pool or queue that is Pausable
+type Pausable interface {
+	// IsPaused will return if the pool or queue is paused
+	IsPaused() bool
+	// Pause will pause the pool or queue
+	Pause()
+	// Resume will resume the pool or queue
+	Resume()
+	// IsPausedIsResumed will return a bool indicating if the pool or queue is paused and a channel that will be closed when it is resumed
+	IsPausedIsResumed() (paused, resumed <-chan struct{})
+}
+
 // ManagedPool is a simple interface to get certain details from a worker pool
 type ManagedPool interface {
 	// AddWorkers adds a number of worker as group to the pool with the provided timeout. A CancelFunc is provided to cancel the group
@@ -192,6 +204,14 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
 				wg.Done()
 				continue
 			}
+			if pausable, ok := mq.Managed.(Pausable); ok {
+				// no point flushing paused queues
+				if pausable.IsPaused() {
+					wg.Done()
+					continue
+				}
+			}
+
 			allEmpty = false
 			if flushable, ok := mq.Managed.(Flushable); ok {
 				log.Debug("Flushing (flushable) queue: %s", mq.Name)
@@ -215,7 +235,7 @@ func (m *Manager) FlushAll(baseCtx context.Context, timeout time.Duration) error
 			log.Debug("All queues are empty")
 			break
 		}
-		// Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushign
+		// Ensure there are always at least 100ms between loops but not more if we've actually been doing some flushing
 		// but don't delay cancellation here.
 		select {
 		case <-ctx.Done():
@@ -298,6 +318,12 @@ func (q *ManagedQueue) AddWorkers(number int, timeout time.Duration) context.Can
 	return nil
 }
 
+// Flushable returns true if the queue is flushable
+func (q *ManagedQueue) Flushable() bool {
+	_, ok := q.Managed.(Flushable)
+	return ok
+}
+
 // Flush flushes the queue with a timeout
 func (q *ManagedQueue) Flush(timeout time.Duration) error {
 	if flushable, ok := q.Managed.(Flushable); ok {
@@ -315,6 +341,34 @@ func (q *ManagedQueue) IsEmpty() bool {
 	return true
 }
 
+// Pausable returns whether the queue is Pausable
+func (q *ManagedQueue) Pausable() bool {
+	_, ok := q.Managed.(Pausable)
+	return ok
+}
+
+// Pause pauses the queue
+func (q *ManagedQueue) Pause() {
+	if pausable, ok := q.Managed.(Pausable); ok {
+		pausable.Pause()
+	}
+}
+
+// IsPaused reveals if the queue is paused
+func (q *ManagedQueue) IsPaused() bool {
+	if pausable, ok := q.Managed.(Pausable); ok {
+		return pausable.IsPaused()
+	}
+	return false
+}
+
+// Resume resumes the queue
+func (q *ManagedQueue) Resume() {
+	if pausable, ok := q.Managed.(Pausable); ok {
+		pausable.Resume()
+	}
+}
+
 // NumberOfWorkers returns the number of workers in the queue
 func (q *ManagedQueue) NumberOfWorkers() int {
 	if pool, ok := q.Managed.(ManagedPool); ok {
diff --git a/modules/queue/queue.go b/modules/queue/queue.go
index 80a9f1f2c7..3a51965143 100644
--- a/modules/queue/queue.go
+++ b/modules/queue/queue.go
@@ -36,7 +36,7 @@ type Type string
 type Data interface{}
 
 // HandlerFunc is a function that takes a variable amount of data and processes it
-type HandlerFunc func(...Data)
+type HandlerFunc func(...Data) (unhandled []Data)
 
 // NewQueueFunc is a function that creates a queue
 type NewQueueFunc func(handler HandlerFunc, config, exemplar interface{}) (Queue, error)
@@ -61,6 +61,12 @@ type Queue interface {
 	Push(Data) error
 }
 
+// PushBackable queues can be pushed back to
+type PushBackable interface {
+	// PushBack pushes data back to the top of the fifo
+	PushBack(Data) error
+}
+
 // DummyQueueType is the type for the dummy queue
 const DummyQueueType Type = "dummy"
 
diff --git a/modules/queue/queue_bytefifo.go b/modules/queue/queue_bytefifo.go
index c4d5d20a89..0380497ea6 100644
--- a/modules/queue/queue_bytefifo.go
+++ b/modules/queue/queue_bytefifo.go
@@ -8,10 +8,12 @@ import (
 	"context"
 	"fmt"
 	"sync"
+	"sync/atomic"
 	"time"
 
 	"code.gitea.io/gitea/modules/json"
 	"code.gitea.io/gitea/modules/log"
+	"code.gitea.io/gitea/modules/util"
 )
 
 // ByteFIFOQueueConfiguration is the configuration for a ByteFIFOQueue
@@ -52,8 +54,7 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem
 	terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
 	shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
 
-	return &ByteFIFOQueue{
-		WorkerPool:         NewWorkerPool(handle, config.WorkerPoolConfiguration),
+	q := &ByteFIFOQueue{
 		byteFIFO:           byteFIFO,
 		typ:                typ,
 		shutdownCtx:        shutdownCtx,
@@ -65,7 +66,17 @@ func NewByteFIFOQueue(typ Type, byteFIFO ByteFIFO, handle HandlerFunc, cfg, exem
 		name:               config.Name,
 		waitOnEmpty:        config.WaitOnEmpty,
 		pushed:             make(chan struct{}, 1),
-	}, nil
+	}
+	q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) {
+		for _, unhandled := range handle(data...) {
+			if fail := q.PushBack(unhandled); fail != nil {
+				failed = append(failed, fail)
+			}
+		}
+		return
+	}, config.WorkerPoolConfiguration)
+
+	return q, nil
 }
 
 // Name returns the name of this queue
@@ -78,6 +89,24 @@ func (q *ByteFIFOQueue) Push(data Data) error {
 	return q.PushFunc(data, nil)
 }
 
+// PushBack pushes data to the fifo
+func (q *ByteFIFOQueue) PushBack(data Data) error {
+	if !assignableTo(data, q.exemplar) {
+		return fmt.Errorf("Unable to assign data: %v to same type as exemplar: %v in %s", data, q.exemplar, q.name)
+	}
+	bs, err := json.Marshal(data)
+	if err != nil {
+		return err
+	}
+	defer func() {
+		select {
+		case q.pushed <- struct{}{}:
+		default:
+		}
+	}()
+	return q.byteFIFO.PushBack(q.terminateCtx, bs)
+}
+
 // PushFunc pushes data to the fifo
 func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
 	if !assignableTo(data, q.exemplar) {
@@ -87,14 +116,12 @@ func (q *ByteFIFOQueue) PushFunc(data Data, fn func() error) error {
 	if err != nil {
 		return err
 	}
-	if q.waitOnEmpty {
-		defer func() {
-			select {
-			case q.pushed <- struct{}{}:
-			default:
-			}
-		}()
-	}
+	defer func() {
+		select {
+		case q.pushed <- struct{}{}:
+		default:
+		}
+	}()
 	return q.byteFIFO.PushFunc(q.terminateCtx, bs, fn)
 }
 
@@ -108,6 +135,15 @@ func (q *ByteFIFOQueue) IsEmpty() bool {
 	return q.byteFIFO.Len(q.terminateCtx) == 0
 }
 
+// Flush flushes the ByteFIFOQueue
+func (q *ByteFIFOQueue) Flush(timeout time.Duration) error {
+	select {
+	case q.pushed <- struct{}{}:
+	default:
+	}
+	return q.WorkerPool.Flush(timeout)
+}
+
 // Run runs the bytefifo queue
 func (q *ByteFIFOQueue) Run(atShutdown, atTerminate func(func())) {
 	atShutdown(q.Shutdown)
@@ -142,31 +178,67 @@ func (q *ByteFIFOQueue) readToChan() {
 
 	// Default backoff values
 	backOffTime := time.Millisecond * 100
+	backOffTimer := time.NewTimer(0)
+	util.StopTimer(backOffTimer)
+
+	paused, _ := q.IsPausedIsResumed()
 
 loop:
 	for {
-		err := q.doPop()
-		if err == errQueueEmpty {
-			log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
+		select {
+		case <-paused:
+			log.Trace("Queue %s pausing", q.name)
+			_, resumed := q.IsPausedIsResumed()
+
 			select {
-			case <-q.pushed:
-				// reset backOffTime
-				backOffTime = 100 * time.Millisecond
-				continue loop
+			case <-resumed:
+				paused, _ = q.IsPausedIsResumed()
+				log.Trace("Queue %s resuming", q.name)
+				if q.HasNoWorkerScaling() {
+					log.Warn(
+						"Queue: %s is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
+							"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", q.name)
+					q.Pause()
+					continue loop
+				}
 			case <-q.shutdownCtx.Done():
-				// Oops we've been shutdown whilst waiting
-				// Make sure the worker pool is shutdown too
+				// tell the pool to shutdown.
 				q.baseCtxCancel()
 				return
+			case data := <-q.dataChan:
+				if err := q.PushBack(data); err != nil {
+					log.Error("Unable to push back data into queue %s", q.name)
+				}
+				atomic.AddInt64(&q.numInQueue, -1)
 			}
+		default:
 		}
 
-		// Reset the backOffTime if there is no error or an unmarshalError
-		if err == nil || err == errUnmarshal {
-			backOffTime = 100 * time.Millisecond
+		// empty the pushed channel
+		select {
+		case <-q.pushed:
+		default:
 		}
 
+		err := q.doPop()
+
+		util.StopTimer(backOffTimer)
+
 		if err != nil {
+			if err == errQueueEmpty && q.waitOnEmpty {
+				log.Trace("%s: %s Waiting on Empty", q.typ, q.name)
+
+				// reset the backoff time but don't set the timer
+				backOffTime = 100 * time.Millisecond
+			} else if err == errUnmarshal {
+				// reset the timer and backoff
+				backOffTime = 100 * time.Millisecond
+				backOffTimer.Reset(backOffTime)
+			} else {
+				//  backoff
+				backOffTimer.Reset(backOffTime)
+			}
+
 			// Need to Backoff
 			select {
 			case <-q.shutdownCtx.Done():
@@ -174,8 +246,13 @@ loop:
 				// Make sure the worker pool is shutdown too
 				q.baseCtxCancel()
 				return
-			case <-time.After(backOffTime):
-				// OK we've waited - so backoff a bit
+			case <-q.pushed:
+				// Data has been pushed to the fifo (or flush has been called)
+				// reset the backoff time
+				backOffTime = 100 * time.Millisecond
+				continue loop
+			case <-backOffTimer.C:
+				// Calculate the next backoff time
 				backOffTime += backOffTime / 2
 				if backOffTime > maxBackOffTime {
 					backOffTime = maxBackOffTime
@@ -183,6 +260,10 @@ loop:
 				continue loop
 			}
 		}
+
+		// Reset the backoff time
+		backOffTime = 100 * time.Millisecond
+
 		select {
 		case <-q.shutdownCtx.Done():
 			// Oops we've been shutdown
@@ -289,9 +370,8 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
 	terminateCtx, terminateCtxCancel := context.WithCancel(context.Background())
 	shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
 
-	return &ByteFIFOUniqueQueue{
+	q := &ByteFIFOUniqueQueue{
 		ByteFIFOQueue: ByteFIFOQueue{
-			WorkerPool:         NewWorkerPool(handle, config.WorkerPoolConfiguration),
 			byteFIFO:           byteFIFO,
 			typ:                typ,
 			shutdownCtx:        shutdownCtx,
@@ -302,7 +382,17 @@ func NewByteFIFOUniqueQueue(typ Type, byteFIFO UniqueByteFIFO, handle HandlerFun
 			workers:            config.Workers,
 			name:               config.Name,
 		},
-	}, nil
+	}
+	q.WorkerPool = NewWorkerPool(func(data ...Data) (failed []Data) {
+		for _, unhandled := range handle(data...) {
+			if fail := q.PushBack(unhandled); fail != nil {
+				failed = append(failed, fail)
+			}
+		}
+		return
+	}, config.WorkerPoolConfiguration)
+
+	return q, nil
 }
 
 // Has checks if the provided data is in the queue
diff --git a/modules/queue/queue_channel.go b/modules/queue/queue_channel.go
index 4df64b69ee..7de9c17c86 100644
--- a/modules/queue/queue_channel.go
+++ b/modules/queue/queue_channel.go
@@ -7,6 +7,8 @@ package queue
 import (
 	"context"
 	"fmt"
+	"sync/atomic"
+	"time"
 
 	"code.gitea.io/gitea/modules/log"
 )
@@ -51,7 +53,6 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
 	shutdownCtx, shutdownCtxCancel := context.WithCancel(terminateCtx)
 
 	queue := &ChannelQueue{
-		WorkerPool:         NewWorkerPool(handle, config.WorkerPoolConfiguration),
 		shutdownCtx:        shutdownCtx,
 		shutdownCtxCancel:  shutdownCtxCancel,
 		terminateCtx:       terminateCtx,
@@ -60,6 +61,23 @@ func NewChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, erro
 		workers:            config.Workers,
 		name:               config.Name,
 	}
+	queue.WorkerPool = NewWorkerPool(func(data ...Data) []Data {
+		unhandled := handle(data...)
+		if len(unhandled) > 0 {
+			// We can only pushback to the channel if we're paused.
+			if queue.IsPaused() {
+				atomic.AddInt64(&queue.numInQueue, int64(len(unhandled)))
+				go func() {
+					for _, datum := range data {
+						queue.dataChan <- datum
+					}
+				}()
+				return nil
+			}
+		}
+		return unhandled
+	}, config.WorkerPoolConfiguration)
+
 	queue.qid = GetManager().Add(queue, ChannelQueueType, config, exemplar)
 	return queue, nil
 }
@@ -81,6 +99,39 @@ func (q *ChannelQueue) Push(data Data) error {
 	return nil
 }
 
+// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
+func (q *ChannelQueue) Flush(timeout time.Duration) error {
+	if q.IsPaused() {
+		return nil
+	}
+	ctx, cancel := q.commonRegisterWorkers(1, timeout, true)
+	defer cancel()
+	return q.FlushWithContext(ctx)
+}
+
+// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
+func (q *ChannelQueue) FlushWithContext(ctx context.Context) error {
+	log.Trace("ChannelQueue: %d Flush", q.qid)
+	paused, _ := q.IsPausedIsResumed()
+	for {
+		select {
+		case <-paused:
+			return nil
+		case data := <-q.dataChan:
+			if unhandled := q.handle(data); unhandled != nil {
+				log.Error("Unhandled Data whilst flushing queue %d", q.qid)
+			}
+			atomic.AddInt64(&q.numInQueue, -1)
+		case <-q.baseCtx.Done():
+			return q.baseCtx.Err()
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+			return nil
+		}
+	}
+}
+
 // Shutdown processing from this queue
 func (q *ChannelQueue) Shutdown() {
 	q.lock.Lock()
@@ -94,6 +145,7 @@ func (q *ChannelQueue) Shutdown() {
 	log.Trace("ChannelQueue: %s Shutting down", q.name)
 	go func() {
 		log.Trace("ChannelQueue: %s Flushing", q.name)
+		// We can't use Cleanup here because that will close the channel
 		if err := q.FlushWithContext(q.terminateCtx); err != nil {
 			log.Warn("ChannelQueue: %s Terminated before completed flushing", q.name)
 			return
diff --git a/modules/queue/queue_channel_test.go b/modules/queue/queue_channel_test.go
index f1ddd7ec92..b700b28a14 100644
--- a/modules/queue/queue_channel_test.go
+++ b/modules/queue/queue_channel_test.go
@@ -5,6 +5,7 @@
 package queue
 
 import (
+	"sync"
 	"testing"
 	"time"
 
@@ -13,11 +14,12 @@ import (
 
 func TestChannelQueue(t *testing.T) {
 	handleChan := make(chan *testData)
-	handle := func(data ...Data) {
+	handle := func(data ...Data) []Data {
 		for _, datum := range data {
 			testDatum := datum.(*testData)
 			handleChan <- testDatum
 		}
+		return nil
 	}
 
 	nilFn := func(_ func()) {}
@@ -52,12 +54,13 @@ func TestChannelQueue(t *testing.T) {
 
 func TestChannelQueue_Batch(t *testing.T) {
 	handleChan := make(chan *testData)
-	handle := func(data ...Data) {
+	handle := func(data ...Data) []Data {
 		assert.True(t, len(data) == 2)
 		for _, datum := range data {
 			testDatum := datum.(*testData)
 			handleChan <- testDatum
 		}
+		return nil
 	}
 
 	nilFn := func(_ func()) {}
@@ -95,3 +98,156 @@ func TestChannelQueue_Batch(t *testing.T) {
 	err = queue.Push(test1)
 	assert.Error(t, err)
 }
+
+func TestChannelQueue_Pause(t *testing.T) {
+	lock := sync.Mutex{}
+	var queue Queue
+	var err error
+	pushBack := false
+	handleChan := make(chan *testData)
+	handle := func(data ...Data) []Data {
+		lock.Lock()
+		if pushBack {
+			if pausable, ok := queue.(Pausable); ok {
+				pausable.Pause()
+			}
+			pushBack = false
+			lock.Unlock()
+			return data
+		}
+		lock.Unlock()
+
+		for _, datum := range data {
+			testDatum := datum.(*testData)
+			handleChan <- testDatum
+		}
+		return nil
+	}
+	nilFn := func(_ func()) {}
+
+	queue, err = NewChannelQueue(handle,
+		ChannelQueueConfiguration{
+			WorkerPoolConfiguration: WorkerPoolConfiguration{
+				QueueLength:  20,
+				BatchLength:  1,
+				BlockTimeout: 0,
+				BoostTimeout: 0,
+				BoostWorkers: 0,
+				MaxWorkers:   10,
+			},
+			Workers: 1,
+		}, &testData{})
+	assert.NoError(t, err)
+
+	go queue.Run(nilFn, nilFn)
+
+	test1 := testData{"A", 1}
+	test2 := testData{"B", 2}
+	queue.Push(&test1)
+
+	pausable, ok := queue.(Pausable)
+	if !assert.True(t, ok) {
+		return
+	}
+	result1 := <-handleChan
+	assert.Equal(t, test1.TestString, result1.TestString)
+	assert.Equal(t, test1.TestInt, result1.TestInt)
+
+	pausable.Pause()
+
+	paused, resumed := pausable.IsPausedIsResumed()
+
+	select {
+	case <-paused:
+	case <-resumed:
+		assert.Fail(t, "Queue should not be resumed")
+		return
+	default:
+		assert.Fail(t, "Queue is not paused")
+		return
+	}
+
+	queue.Push(&test2)
+
+	var result2 *testData
+	select {
+	case result2 = <-handleChan:
+		assert.Fail(t, "handler chan should be empty")
+	case <-time.After(100 * time.Millisecond):
+	}
+
+	assert.Nil(t, result2)
+
+	pausable.Resume()
+
+	select {
+	case <-resumed:
+	default:
+		assert.Fail(t, "Queue should be resumed")
+	}
+
+	select {
+	case result2 = <-handleChan:
+	case <-time.After(500 * time.Millisecond):
+		assert.Fail(t, "handler chan should contain test2")
+	}
+
+	assert.Equal(t, test2.TestString, result2.TestString)
+	assert.Equal(t, test2.TestInt, result2.TestInt)
+
+	lock.Lock()
+	pushBack = true
+	lock.Unlock()
+
+	paused, resumed = pausable.IsPausedIsResumed()
+
+	select {
+	case <-paused:
+		assert.Fail(t, "Queue should not be paused")
+		return
+	case <-resumed:
+	default:
+		assert.Fail(t, "Queue is not resumed")
+		return
+	}
+
+	queue.Push(&test1)
+
+	select {
+	case <-paused:
+	case <-handleChan:
+		assert.Fail(t, "handler chan should not contain test1")
+		return
+	case <-time.After(500 * time.Millisecond):
+		assert.Fail(t, "queue should be paused")
+		return
+	}
+
+	paused, resumed = pausable.IsPausedIsResumed()
+
+	select {
+	case <-paused:
+	case <-resumed:
+		assert.Fail(t, "Queue should not be resumed")
+		return
+	default:
+		assert.Fail(t, "Queue is not paused")
+		return
+	}
+
+	pausable.Resume()
+
+	select {
+	case <-resumed:
+	default:
+		assert.Fail(t, "Queue should be resumed")
+	}
+
+	select {
+	case result1 = <-handleChan:
+	case <-time.After(500 * time.Millisecond):
+		assert.Fail(t, "handler chan should contain test1")
+	}
+	assert.Equal(t, test1.TestString, result1.TestString)
+	assert.Equal(t, test1.TestInt, result1.TestInt)
+}
diff --git a/modules/queue/queue_disk.go b/modules/queue/queue_disk.go
index 911233a5d9..2691ab02f5 100644
--- a/modules/queue/queue_disk.go
+++ b/modules/queue/queue_disk.go
@@ -94,6 +94,11 @@ func (fifo *LevelQueueByteFIFO) PushFunc(ctx context.Context, data []byte, fn fu
 	return fifo.internal.LPush(data)
 }
 
+// PushBack pushes data to the top of the fifo
+func (fifo *LevelQueueByteFIFO) PushBack(ctx context.Context, data []byte) error {
+	return fifo.internal.RPush(data)
+}
+
 // Pop pops data from the start of the fifo
 func (fifo *LevelQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
 	data, err := fifo.internal.RPop()
diff --git a/modules/queue/queue_disk_channel.go b/modules/queue/queue_disk_channel.go
index f3cd132d7d..3b21575a0e 100644
--- a/modules/queue/queue_disk_channel.go
+++ b/modules/queue/queue_disk_channel.go
@@ -51,7 +51,20 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
 	}
 	config := configInterface.(PersistableChannelQueueConfiguration)
 
-	channelQueue, err := NewChannelQueue(handle, ChannelQueueConfiguration{
+	queue := &PersistableChannelQueue{
+		closed: make(chan struct{}),
+	}
+
+	wrappedHandle := func(data ...Data) (failed []Data) {
+		for _, unhandled := range handle(data...) {
+			if fail := queue.PushBack(unhandled); fail != nil {
+				failed = append(failed, fail)
+			}
+		}
+		return
+	}
+
+	channelQueue, err := NewChannelQueue(wrappedHandle, ChannelQueueConfiguration{
 		WorkerPoolConfiguration: WorkerPoolConfiguration{
 			QueueLength:  config.QueueLength,
 			BatchLength:  config.BatchLength,
@@ -84,15 +97,12 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
 		DataDir: config.DataDir,
 	}
 
-	levelQueue, err := NewLevelQueue(handle, levelCfg, exemplar)
+	levelQueue, err := NewLevelQueue(wrappedHandle, levelCfg, exemplar)
 	if err == nil {
-		queue := &PersistableChannelQueue{
-			channelQueue: channelQueue.(*ChannelQueue),
-			delayedStarter: delayedStarter{
-				internal: levelQueue.(*LevelQueue),
-				name:     config.Name,
-			},
-			closed: make(chan struct{}),
+		queue.channelQueue = channelQueue.(*ChannelQueue)
+		queue.delayedStarter = delayedStarter{
+			internal: levelQueue.(*LevelQueue),
+			name:     config.Name,
 		}
 		_ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
 		return queue, nil
@@ -102,16 +112,13 @@ func NewPersistableChannelQueue(handle HandlerFunc, cfg, exemplar interface{}) (
 		return nil, ErrInvalidConfiguration{cfg: cfg}
 	}
 
-	queue := &PersistableChannelQueue{
-		channelQueue: channelQueue.(*ChannelQueue),
-		delayedStarter: delayedStarter{
-			cfg:         levelCfg,
-			underlying:  LevelQueueType,
-			timeout:     config.Timeout,
-			maxAttempts: config.MaxAttempts,
-			name:        config.Name,
-		},
-		closed: make(chan struct{}),
+	queue.channelQueue = channelQueue.(*ChannelQueue)
+	queue.delayedStarter = delayedStarter{
+		cfg:         levelCfg,
+		underlying:  LevelQueueType,
+		timeout:     config.Timeout,
+		maxAttempts: config.MaxAttempts,
+		name:        config.Name,
 	}
 	_ = GetManager().Add(queue, PersistableChannelQueueType, config, exemplar)
 	return queue, nil
@@ -132,6 +139,19 @@ func (q *PersistableChannelQueue) Push(data Data) error {
 	}
 }
 
+// PushBack will push the indexer data to queue
+func (q *PersistableChannelQueue) PushBack(data Data) error {
+	select {
+	case <-q.closed:
+		if pbr, ok := q.internal.(PushBackable); ok {
+			return pbr.PushBack(data)
+		}
+		return q.internal.Push(data)
+	default:
+		return q.channelQueue.Push(data)
+	}
+}
+
 // Run starts to run the queue
 func (q *PersistableChannelQueue) Run(atShutdown, atTerminate func(func())) {
 	log.Debug("PersistableChannelQueue: %s Starting", q.delayedStarter.name)
@@ -226,6 +246,48 @@ func (q *PersistableChannelQueue) IsEmpty() bool {
 	return q.internal.IsEmpty()
 }
 
+// IsPaused returns if the pool is paused
+func (q *PersistableChannelQueue) IsPaused() bool {
+	return q.channelQueue.IsPaused()
+}
+
+// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed
+func (q *PersistableChannelQueue) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) {
+	return q.channelQueue.IsPausedIsResumed()
+}
+
+// Pause pauses the WorkerPool
+func (q *PersistableChannelQueue) Pause() {
+	q.channelQueue.Pause()
+	q.lock.Lock()
+	defer q.lock.Unlock()
+	if q.internal == nil {
+		return
+	}
+
+	pausable, ok := q.internal.(Pausable)
+	if !ok {
+		return
+	}
+	pausable.Pause()
+}
+
+// Resume resumes the WorkerPool
+func (q *PersistableChannelQueue) Resume() {
+	q.channelQueue.Resume()
+	q.lock.Lock()
+	defer q.lock.Unlock()
+	if q.internal == nil {
+		return
+	}
+
+	pausable, ok := q.internal.(Pausable)
+	if !ok {
+		return
+	}
+	pausable.Resume()
+}
+
 // Shutdown processing this queue
 func (q *PersistableChannelQueue) Shutdown() {
 	log.Trace("PersistableChannelQueue: %s Shutting down", q.delayedStarter.name)
diff --git a/modules/queue/queue_disk_channel_test.go b/modules/queue/queue_disk_channel_test.go
index db12d9575c..9bbd146efe 100644
--- a/modules/queue/queue_disk_channel_test.go
+++ b/modules/queue/queue_disk_channel_test.go
@@ -8,7 +8,9 @@ import (
 	"os"
 	"sync"
 	"testing"
+	"time"
 
+	"code.gitea.io/gitea/modules/log"
 	"code.gitea.io/gitea/modules/util"
 
 	"github.com/stretchr/testify/assert"
@@ -16,7 +18,7 @@ import (
 
 func TestPersistableChannelQueue(t *testing.T) {
 	handleChan := make(chan *testData)
-	handle := func(data ...Data) {
+	handle := func(data ...Data) []Data {
 		for _, datum := range data {
 			if datum == nil {
 				continue
@@ -24,6 +26,7 @@ func TestPersistableChannelQueue(t *testing.T) {
 			testDatum := datum.(*testData)
 			handleChan <- testDatum
 		}
+		return nil
 	}
 
 	lock := sync.Mutex{}
@@ -189,3 +192,290 @@ func TestPersistableChannelQueue(t *testing.T) {
 		callback()
 	}
 }
+
+func TestPersistableChannelQueue_Pause(t *testing.T) {
+	lock := sync.Mutex{}
+	var queue Queue
+	var err error
+	pushBack := false
+
+	handleChan := make(chan *testData)
+	handle := func(data ...Data) []Data {
+		lock.Lock()
+		if pushBack {
+			if pausable, ok := queue.(Pausable); ok {
+				log.Info("pausing")
+				pausable.Pause()
+			}
+			pushBack = false
+			lock.Unlock()
+			return data
+		}
+		lock.Unlock()
+
+		for _, datum := range data {
+			testDatum := datum.(*testData)
+			handleChan <- testDatum
+		}
+		return nil
+	}
+
+	queueShutdown := []func(){}
+	queueTerminate := []func(){}
+
+	tmpDir, err := os.MkdirTemp("", "persistable-channel-queue-pause-test-data")
+	assert.NoError(t, err)
+	defer util.RemoveAll(tmpDir)
+
+	queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
+		DataDir:      tmpDir,
+		BatchLength:  2,
+		QueueLength:  20,
+		Workers:      1,
+		BoostWorkers: 0,
+		MaxWorkers:   10,
+		Name:         "first",
+	}, &testData{})
+	assert.NoError(t, err)
+
+	go queue.Run(func(shutdown func()) {
+		lock.Lock()
+		defer lock.Unlock()
+		queueShutdown = append(queueShutdown, shutdown)
+	}, func(terminate func()) {
+		lock.Lock()
+		defer lock.Unlock()
+		queueTerminate = append(queueTerminate, terminate)
+	})
+
+	test1 := testData{"A", 1}
+	test2 := testData{"B", 2}
+
+	err = queue.Push(&test1)
+	assert.NoError(t, err)
+
+	pausable, ok := queue.(Pausable)
+	if !assert.True(t, ok) {
+		return
+	}
+	result1 := <-handleChan
+	assert.Equal(t, test1.TestString, result1.TestString)
+	assert.Equal(t, test1.TestInt, result1.TestInt)
+
+	pausable.Pause()
+	paused, resumed := pausable.IsPausedIsResumed()
+
+	select {
+	case <-paused:
+	case <-resumed:
+		assert.Fail(t, "Queue should not be resumed")
+		return
+	default:
+		assert.Fail(t, "Queue is not paused")
+		return
+	}
+
+	queue.Push(&test2)
+
+	var result2 *testData
+	select {
+	case result2 = <-handleChan:
+		assert.Fail(t, "handler chan should be empty")
+	case <-time.After(100 * time.Millisecond):
+	}
+
+	assert.Nil(t, result2)
+
+	pausable.Resume()
+
+	select {
+	case <-resumed:
+	default:
+		assert.Fail(t, "Queue should be resumed")
+	}
+
+	select {
+	case result2 = <-handleChan:
+	case <-time.After(500 * time.Millisecond):
+		assert.Fail(t, "handler chan should contain test2")
+	}
+
+	assert.Equal(t, test2.TestString, result2.TestString)
+	assert.Equal(t, test2.TestInt, result2.TestInt)
+
+	lock.Lock()
+	pushBack = true
+	lock.Unlock()
+
+	paused, resumed = pausable.IsPausedIsResumed()
+
+	select {
+	case <-paused:
+		assert.Fail(t, "Queue should not be paused")
+		return
+	case <-resumed:
+	default:
+		assert.Fail(t, "Queue is not resumed")
+		return
+	}
+
+	queue.Push(&test1)
+
+	select {
+	case <-paused:
+	case <-handleChan:
+		assert.Fail(t, "handler chan should not contain test1")
+		return
+	case <-time.After(500 * time.Millisecond):
+		assert.Fail(t, "queue should be paused")
+		return
+	}
+
+	paused, resumed = pausable.IsPausedIsResumed()
+
+	select {
+	case <-paused:
+	case <-resumed:
+		assert.Fail(t, "Queue should not be resumed")
+		return
+	default:
+		assert.Fail(t, "Queue is not paused")
+		return
+	}
+
+	pausable.Resume()
+
+	select {
+	case <-resumed:
+	default:
+		assert.Fail(t, "Queue should be resumed")
+	}
+
+	select {
+	case result1 = <-handleChan:
+	case <-time.After(500 * time.Millisecond):
+		assert.Fail(t, "handler chan should contain test1")
+	}
+	assert.Equal(t, test1.TestString, result1.TestString)
+	assert.Equal(t, test1.TestInt, result1.TestInt)
+
+	lock.Lock()
+	callbacks := make([]func(), len(queueShutdown))
+	copy(callbacks, queueShutdown)
+	lock.Unlock()
+	// Now shutdown the queue
+	for _, callback := range callbacks {
+		callback()
+	}
+
+	// Wait til it is closed
+	<-queue.(*PersistableChannelQueue).closed
+
+	err = queue.Push(&test1)
+	assert.NoError(t, err)
+	err = queue.Push(&test2)
+	assert.NoError(t, err)
+	select {
+	case <-handleChan:
+		assert.Fail(t, "Handler processing should have stopped")
+	default:
+	}
+
+	// terminate the queue
+	lock.Lock()
+	callbacks = make([]func(), len(queueTerminate))
+	copy(callbacks, queueTerminate)
+	lock.Unlock()
+	for _, callback := range callbacks {
+		callback()
+	}
+
+	select {
+	case <-handleChan:
+		assert.Fail(t, "Handler processing should have stopped")
+	default:
+	}
+
+	lock.Lock()
+	pushBack = true
+	lock.Unlock()
+
+	// Reopen queue
+	queue, err = NewPersistableChannelQueue(handle, PersistableChannelQueueConfiguration{
+		DataDir:      tmpDir,
+		BatchLength:  1,
+		QueueLength:  20,
+		Workers:      1,
+		BoostWorkers: 0,
+		MaxWorkers:   10,
+		Name:         "second",
+	}, &testData{})
+	assert.NoError(t, err)
+	pausable, ok = queue.(Pausable)
+	if !assert.True(t, ok) {
+		return
+	}
+
+	paused, _ = pausable.IsPausedIsResumed()
+
+	go queue.Run(func(shutdown func()) {
+		lock.Lock()
+		defer lock.Unlock()
+		queueShutdown = append(queueShutdown, shutdown)
+	}, func(terminate func()) {
+		lock.Lock()
+		defer lock.Unlock()
+		queueTerminate = append(queueTerminate, terminate)
+	})
+
+	select {
+	case <-handleChan:
+		assert.Fail(t, "Handler processing should have stopped")
+	case <-paused:
+	}
+
+	paused, resumed = pausable.IsPausedIsResumed()
+
+	select {
+	case <-paused:
+	case <-resumed:
+		assert.Fail(t, "Queue should not be resumed")
+		return
+	default:
+		assert.Fail(t, "Queue is not paused")
+		return
+	}
+
+	select {
+	case <-handleChan:
+		assert.Fail(t, "Handler processing should have stopped")
+	default:
+	}
+
+	pausable.Resume()
+
+	result3 := <-handleChan
+	result4 := <-handleChan
+	if result4.TestString == test1.TestString {
+		result3, result4 = result4, result3
+	}
+	assert.Equal(t, test1.TestString, result3.TestString)
+	assert.Equal(t, test1.TestInt, result3.TestInt)
+
+	assert.Equal(t, test2.TestString, result4.TestString)
+	assert.Equal(t, test2.TestInt, result4.TestInt)
+	lock.Lock()
+	callbacks = make([]func(), len(queueShutdown))
+	copy(callbacks, queueShutdown)
+	lock.Unlock()
+	for _, callback := range callbacks {
+		callback()
+	}
+	lock.Lock()
+	callbacks = make([]func(), len(queueTerminate))
+	copy(callbacks, queueTerminate)
+	lock.Unlock()
+	for _, callback := range callbacks {
+		callback()
+	}
+}
diff --git a/modules/queue/queue_disk_test.go b/modules/queue/queue_disk_test.go
index a2c21fec08..d2d8e135cb 100644
--- a/modules/queue/queue_disk_test.go
+++ b/modules/queue/queue_disk_test.go
@@ -17,12 +17,13 @@ import (
 
 func TestLevelQueue(t *testing.T) {
 	handleChan := make(chan *testData)
-	handle := func(data ...Data) {
+	handle := func(data ...Data) []Data {
 		assert.True(t, len(data) == 2)
 		for _, datum := range data {
 			testDatum := datum.(*testData)
 			handleChan <- testDatum
 		}
+		return nil
 	}
 
 	var lock sync.Mutex
diff --git a/modules/queue/queue_redis.go b/modules/queue/queue_redis.go
index a5fb866dc1..84ab235d5e 100644
--- a/modules/queue/queue_redis.go
+++ b/modules/queue/queue_redis.go
@@ -57,6 +57,7 @@ func NewRedisQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue, error)
 
 type redisClient interface {
 	RPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd
+	LPush(ctx context.Context, key string, args ...interface{}) *redis.IntCmd
 	LPop(ctx context.Context, key string) *redis.StringCmd
 	LLen(ctx context.Context, key string) *redis.IntCmd
 	SAdd(ctx context.Context, key string, members ...interface{}) *redis.IntCmd
@@ -103,6 +104,11 @@ func (fifo *RedisByteFIFO) PushFunc(ctx context.Context, data []byte, fn func()
 	return fifo.client.RPush(ctx, fifo.queueName, data).Err()
 }
 
+// PushBack pushes data to the top of the fifo
+func (fifo *RedisByteFIFO) PushBack(ctx context.Context, data []byte) error {
+	return fifo.client.LPush(ctx, fifo.queueName, data).Err()
+}
+
 // Pop pops data from the start of the fifo
 func (fifo *RedisByteFIFO) Pop(ctx context.Context) ([]byte, error) {
 	data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
diff --git a/modules/queue/setting.go b/modules/queue/setting.go
index caaf123d42..61f156c377 100644
--- a/modules/queue/setting.go
+++ b/modules/queue/setting.go
@@ -65,6 +65,16 @@ func CreateQueue(name string, handle HandlerFunc, exemplar interface{}) Queue {
 		log.Error("Unable to create queue for %s: %v", name, err)
 		return nil
 	}
+
+	// Sanity check configuration
+	if q.Workers == 0 && (q.BoostTimeout == 0 || q.BoostWorkers == 0 || q.MaxWorkers == 0) {
+		log.Warn("Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block", q.Name)
+		if pausable, ok := returnable.(Pausable); ok {
+			log.Warn("Queue: %s is being paused to prevent data-loss, add workers manually and unpause.", q.Name)
+			pausable.Pause()
+		}
+	}
+
 	return returnable
 }
 
@@ -103,5 +113,15 @@ func CreateUniqueQueue(name string, handle HandlerFunc, exemplar interface{}) Un
 		log.Error("Unable to create unique queue for %s: %v", name, err)
 		return nil
 	}
+
+	// Sanity check configuration
+	if q.Workers == 0 && (q.BoostTimeout == 0 || q.BoostWorkers == 0 || q.MaxWorkers == 0) {
+		log.Warn("Queue: %s is configured to be non-scaling and have no workers\n - this configuration is likely incorrect and could cause Gitea to block", q.Name)
+		if pausable, ok := returnable.(Pausable); ok {
+			log.Warn("Queue: %s is being paused to prevent data-loss, add workers manually and unpause.", q.Name)
+			pausable.Pause()
+		}
+	}
+
 	return returnable.(UniqueQueue)
 }
diff --git a/modules/queue/unique_queue_channel.go b/modules/queue/unique_queue_channel.go
index f617595c04..b6d2e770fc 100644
--- a/modules/queue/unique_queue_channel.go
+++ b/modules/queue/unique_queue_channel.go
@@ -8,6 +8,8 @@ import (
 	"context"
 	"fmt"
 	"sync"
+	"sync/atomic"
+	"time"
 
 	"code.gitea.io/gitea/modules/json"
 	"code.gitea.io/gitea/modules/log"
@@ -64,7 +66,7 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
 		workers:            config.Workers,
 		name:               config.Name,
 	}
-	queue.WorkerPool = NewWorkerPool(func(data ...Data) {
+	queue.WorkerPool = NewWorkerPool(func(data ...Data) (unhandled []Data) {
 		for _, datum := range data {
 			// No error is possible here because PushFunc ensures that this can be marshalled
 			bs, _ := json.Marshal(datum)
@@ -73,8 +75,20 @@ func NewChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
 			delete(queue.table, string(bs))
 			queue.lock.Unlock()
 
-			handle(datum)
+			if u := handle(datum); u != nil {
+				if queue.IsPaused() {
+					// We can only pushback to the channel if we're paused.
+					go func() {
+						if err := queue.Push(u[0]); err != nil {
+							log.Error("Unable to push back to queue %d. Error: %v", queue.qid, err)
+						}
+					}()
+				} else {
+					unhandled = append(unhandled, u...)
+				}
+			}
 		}
+		return unhandled
 	}, config.WorkerPoolConfiguration)
 
 	queue.qid = GetManager().Add(queue, ChannelUniqueQueueType, config, exemplar)
@@ -143,6 +157,42 @@ func (q *ChannelUniqueQueue) Has(data Data) (bool, error) {
 	return has, nil
 }
 
+// Flush flushes the channel with a timeout - the Flush worker will be registered as a flush worker with the manager
+func (q *ChannelUniqueQueue) Flush(timeout time.Duration) error {
+	if q.IsPaused() {
+		return nil
+	}
+	ctx, cancel := q.commonRegisterWorkers(1, timeout, true)
+	defer cancel()
+	return q.FlushWithContext(ctx)
+}
+
+// FlushWithContext is very similar to CleanUp but it will return as soon as the dataChan is empty
+func (q *ChannelUniqueQueue) FlushWithContext(ctx context.Context) error {
+	log.Trace("ChannelUniqueQueue: %d Flush", q.qid)
+	paused, _ := q.IsPausedIsResumed()
+	for {
+		select {
+		case <-paused:
+			return nil
+		default:
+		}
+		select {
+		case data := <-q.dataChan:
+			if unhandled := q.handle(data); unhandled != nil {
+				log.Error("Unhandled Data whilst flushing queue %d", q.qid)
+			}
+			atomic.AddInt64(&q.numInQueue, -1)
+		case <-q.baseCtx.Done():
+			return q.baseCtx.Err()
+		case <-ctx.Done():
+			return ctx.Err()
+		default:
+			return nil
+		}
+	}
+}
+
 // Shutdown processing from this queue
 func (q *ChannelUniqueQueue) Shutdown() {
 	log.Trace("ChannelUniqueQueue: %s Shutting down", q.name)
diff --git a/modules/queue/unique_queue_channel_test.go b/modules/queue/unique_queue_channel_test.go
new file mode 100644
index 0000000000..ef6752079e
--- /dev/null
+++ b/modules/queue/unique_queue_channel_test.go
@@ -0,0 +1,252 @@
+// Copyright 2019 The Gitea Authors. All rights reserved.
+// Use of this source code is governed by a MIT-style
+// license that can be found in the LICENSE file.
+
+package queue
+
+import (
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestChannelUniqueQueue(t *testing.T) {
+	handleChan := make(chan *testData)
+	handle := func(data ...Data) []Data {
+		for _, datum := range data {
+			testDatum := datum.(*testData)
+			handleChan <- testDatum
+		}
+		return nil
+	}
+
+	nilFn := func(_ func()) {}
+
+	queue, err := NewChannelUniqueQueue(handle,
+		ChannelQueueConfiguration{
+			WorkerPoolConfiguration: WorkerPoolConfiguration{
+				QueueLength:  0,
+				MaxWorkers:   10,
+				BlockTimeout: 1 * time.Second,
+				BoostTimeout: 5 * time.Minute,
+				BoostWorkers: 5,
+			},
+			Workers: 0,
+			Name:    "TestChannelQueue",
+		}, &testData{})
+	assert.NoError(t, err)
+
+	assert.Equal(t, queue.(*ChannelUniqueQueue).WorkerPool.boostWorkers, 5)
+
+	go queue.Run(nilFn, nilFn)
+
+	test1 := testData{"A", 1}
+	go queue.Push(&test1)
+	result1 := <-handleChan
+	assert.Equal(t, test1.TestString, result1.TestString)
+	assert.Equal(t, test1.TestInt, result1.TestInt)
+
+	err = queue.Push(test1)
+	assert.Error(t, err)
+}
+
+func TestChannelUniqueQueue_Batch(t *testing.T) {
+	handleChan := make(chan *testData)
+	handle := func(data ...Data) []Data {
+		for _, datum := range data {
+			testDatum := datum.(*testData)
+			handleChan <- testDatum
+		}
+		return nil
+	}
+
+	nilFn := func(_ func()) {}
+
+	queue, err := NewChannelUniqueQueue(handle,
+		ChannelQueueConfiguration{
+			WorkerPoolConfiguration: WorkerPoolConfiguration{
+				QueueLength:  20,
+				BatchLength:  2,
+				BlockTimeout: 0,
+				BoostTimeout: 0,
+				BoostWorkers: 0,
+				MaxWorkers:   10,
+			},
+			Workers: 1,
+		}, &testData{})
+	assert.NoError(t, err)
+
+	go queue.Run(nilFn, nilFn)
+
+	test1 := testData{"A", 1}
+	test2 := testData{"B", 2}
+
+	queue.Push(&test1)
+	go queue.Push(&test2)
+
+	result1 := <-handleChan
+	assert.Equal(t, test1.TestString, result1.TestString)
+	assert.Equal(t, test1.TestInt, result1.TestInt)
+
+	result2 := <-handleChan
+	assert.Equal(t, test2.TestString, result2.TestString)
+	assert.Equal(t, test2.TestInt, result2.TestInt)
+
+	err = queue.Push(test1)
+	assert.Error(t, err)
+}
+
+func TestChannelUniqueQueue_Pause(t *testing.T) {
+	lock := sync.Mutex{}
+	var queue Queue
+	var err error
+	pushBack := false
+	handleChan := make(chan *testData)
+	handle := func(data ...Data) []Data {
+		lock.Lock()
+		if pushBack {
+			if pausable, ok := queue.(Pausable); ok {
+				pausable.Pause()
+			}
+			pushBack = false
+			lock.Unlock()
+			return data
+		}
+		lock.Unlock()
+
+		for _, datum := range data {
+			testDatum := datum.(*testData)
+			handleChan <- testDatum
+		}
+		return nil
+	}
+	nilFn := func(_ func()) {}
+
+	queue, err = NewChannelUniqueQueue(handle,
+		ChannelQueueConfiguration{
+			WorkerPoolConfiguration: WorkerPoolConfiguration{
+				QueueLength:  20,
+				BatchLength:  1,
+				BlockTimeout: 0,
+				BoostTimeout: 0,
+				BoostWorkers: 0,
+				MaxWorkers:   10,
+			},
+			Workers: 1,
+		}, &testData{})
+	assert.NoError(t, err)
+
+	go queue.Run(nilFn, nilFn)
+
+	test1 := testData{"A", 1}
+	test2 := testData{"B", 2}
+	queue.Push(&test1)
+
+	pausable, ok := queue.(Pausable)
+	if !assert.True(t, ok) {
+		return
+	}
+	result1 := <-handleChan
+	assert.Equal(t, test1.TestString, result1.TestString)
+	assert.Equal(t, test1.TestInt, result1.TestInt)
+
+	pausable.Pause()
+
+	paused, resumed := pausable.IsPausedIsResumed()
+
+	select {
+	case <-paused:
+	case <-resumed:
+		assert.Fail(t, "Queue should not be resumed")
+		return
+	default:
+		assert.Fail(t, "Queue is not paused")
+		return
+	}
+
+	queue.Push(&test2)
+
+	var result2 *testData
+	select {
+	case result2 = <-handleChan:
+		assert.Fail(t, "handler chan should be empty")
+	case <-time.After(100 * time.Millisecond):
+	}
+
+	assert.Nil(t, result2)
+
+	pausable.Resume()
+
+	select {
+	case <-resumed:
+	default:
+		assert.Fail(t, "Queue should be resumed")
+	}
+
+	select {
+	case result2 = <-handleChan:
+	case <-time.After(500 * time.Millisecond):
+		assert.Fail(t, "handler chan should contain test2")
+	}
+
+	assert.Equal(t, test2.TestString, result2.TestString)
+	assert.Equal(t, test2.TestInt, result2.TestInt)
+
+	lock.Lock()
+	pushBack = true
+	lock.Unlock()
+
+	paused, resumed = pausable.IsPausedIsResumed()
+
+	select {
+	case <-paused:
+		assert.Fail(t, "Queue should not be paused")
+		return
+	case <-resumed:
+	default:
+		assert.Fail(t, "Queue is not resumed")
+		return
+	}
+
+	queue.Push(&test1)
+
+	select {
+	case <-paused:
+	case <-handleChan:
+		assert.Fail(t, "handler chan should not contain test1")
+		return
+	case <-time.After(500 * time.Millisecond):
+		assert.Fail(t, "queue should be paused")
+		return
+	}
+
+	paused, resumed = pausable.IsPausedIsResumed()
+
+	select {
+	case <-paused:
+	case <-resumed:
+		assert.Fail(t, "Queue should not be resumed")
+		return
+	default:
+		assert.Fail(t, "Queue is not paused")
+		return
+	}
+
+	pausable.Resume()
+
+	select {
+	case <-resumed:
+	default:
+		assert.Fail(t, "Queue should be resumed")
+	}
+
+	select {
+	case result1 = <-handleChan:
+	case <-time.After(500 * time.Millisecond):
+		assert.Fail(t, "handler chan should contain test1")
+	}
+	assert.Equal(t, test1.TestString, result1.TestString)
+	assert.Equal(t, test1.TestInt, result1.TestInt)
+}
diff --git a/modules/queue/unique_queue_disk.go b/modules/queue/unique_queue_disk.go
index bb0eb7d950..dae32f75a8 100644
--- a/modules/queue/unique_queue_disk.go
+++ b/modules/queue/unique_queue_disk.go
@@ -93,6 +93,11 @@ func (fifo *LevelUniqueQueueByteFIFO) PushFunc(ctx context.Context, data []byte,
 	return fifo.internal.LPushFunc(data, fn)
 }
 
+// PushBack pushes data to the top of the fifo
+func (fifo *LevelUniqueQueueByteFIFO) PushBack(ctx context.Context, data []byte) error {
+	return fifo.internal.RPush(data)
+}
+
 // Pop pops data from the start of the fifo
 func (fifo *LevelUniqueQueueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
 	data, err := fifo.internal.RPop()
diff --git a/modules/queue/unique_queue_disk_channel.go b/modules/queue/unique_queue_disk_channel.go
index d71f5e2b04..7fc304b17e 100644
--- a/modules/queue/unique_queue_disk_channel.go
+++ b/modules/queue/unique_queue_disk_channel.go
@@ -51,7 +51,20 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
 	}
 	config := configInterface.(PersistableChannelUniqueQueueConfiguration)
 
-	channelUniqueQueue, err := NewChannelUniqueQueue(handle, ChannelUniqueQueueConfiguration{
+	queue := &PersistableChannelUniqueQueue{
+		closed: make(chan struct{}),
+	}
+
+	wrappedHandle := func(data ...Data) (failed []Data) {
+		for _, unhandled := range handle(data...) {
+			if fail := queue.PushBack(unhandled); fail != nil {
+				failed = append(failed, fail)
+			}
+		}
+		return
+	}
+
+	channelUniqueQueue, err := NewChannelUniqueQueue(wrappedHandle, ChannelUniqueQueueConfiguration{
 		WorkerPoolConfiguration: WorkerPoolConfiguration{
 			QueueLength:  config.QueueLength,
 			BatchLength:  config.BatchLength,
@@ -84,18 +97,16 @@ func NewPersistableChannelUniqueQueue(handle HandlerFunc, cfg, exemplar interfac
 		DataDir: config.DataDir,
 	}
 
-	queue := &PersistableChannelUniqueQueue{
-		channelQueue: channelUniqueQueue.(*ChannelUniqueQueue),
-		closed:       make(chan struct{}),
-	}
+	queue.channelQueue = channelUniqueQueue.(*ChannelUniqueQueue)
 
-	levelQueue, err := NewLevelUniqueQueue(func(data ...Data) {
+	levelQueue, err := NewLevelUniqueQueue(func(data ...Data) []Data {
 		for _, datum := range data {
 			err := queue.Push(datum)
 			if err != nil && err != ErrAlreadyInQueue {
 				log.Error("Unable push to channelled queue: %v", err)
 			}
 		}
+		return nil
 	}, levelCfg, exemplar)
 	if err == nil {
 		queue.delayedStarter = delayedStarter{
@@ -142,6 +153,19 @@ func (q *PersistableChannelUniqueQueue) PushFunc(data Data, fn func() error) err
 	}
 }
 
+// PushBack will push the indexer data to queue
+func (q *PersistableChannelUniqueQueue) PushBack(data Data) error {
+	select {
+	case <-q.closed:
+		if pbr, ok := q.internal.(PushBackable); ok {
+			return pbr.PushBack(data)
+		}
+		return q.internal.Push(data)
+	default:
+		return q.channelQueue.Push(data)
+	}
+}
+
 // Has will test if the queue has the data
 func (q *PersistableChannelUniqueQueue) Has(data Data) (bool, error) {
 	// This is more difficult...
@@ -163,13 +187,14 @@ func (q *PersistableChannelUniqueQueue) Run(atShutdown, atTerminate func(func())
 
 	q.lock.Lock()
 	if q.internal == nil {
-		err := q.setInternal(atShutdown, func(data ...Data) {
+		err := q.setInternal(atShutdown, func(data ...Data) []Data {
 			for _, datum := range data {
 				err := q.Push(datum)
 				if err != nil && err != ErrAlreadyInQueue {
 					log.Error("Unable push to channelled queue: %v", err)
 				}
 			}
+			return nil
 		}, q.channelQueue.exemplar)
 		q.lock.Unlock()
 		if err != nil {
diff --git a/modules/queue/unique_queue_redis.go b/modules/queue/unique_queue_redis.go
index 7474c09665..477d5dd81f 100644
--- a/modules/queue/unique_queue_redis.go
+++ b/modules/queue/unique_queue_redis.go
@@ -105,6 +105,18 @@ func (fifo *RedisUniqueByteFIFO) PushFunc(ctx context.Context, data []byte, fn f
 	return fifo.client.RPush(ctx, fifo.queueName, data).Err()
 }
 
+// PushBack pushes data to the top of the fifo
+func (fifo *RedisUniqueByteFIFO) PushBack(ctx context.Context, data []byte) error {
+	added, err := fifo.client.SAdd(ctx, fifo.setName, data).Result()
+	if err != nil {
+		return err
+	}
+	if added == 0 {
+		return ErrAlreadyInQueue
+	}
+	return fifo.client.LPush(ctx, fifo.queueName, data).Err()
+}
+
 // Pop pops data from the start of the fifo
 func (fifo *RedisUniqueByteFIFO) Pop(ctx context.Context) ([]byte, error) {
 	data, err := fifo.client.LPop(ctx, fifo.queueName).Bytes()
diff --git a/modules/queue/unique_queue_wrapped.go b/modules/queue/unique_queue_wrapped.go
index 8c815218dd..32fa9ed970 100644
--- a/modules/queue/unique_queue_wrapped.go
+++ b/modules/queue/unique_queue_wrapped.go
@@ -73,7 +73,7 @@ func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
 
 	// wrapped.handle is passed to the delayedStarting internal queue and is run to handle
 	// data passed to
-	wrapped.handle = func(data ...Data) {
+	wrapped.handle = func(data ...Data) (unhandled []Data) {
 		for _, datum := range data {
 			wrapped.tlock.Lock()
 			if !wrapped.ready {
@@ -87,8 +87,11 @@ func NewWrappedUniqueQueue(handle HandlerFunc, cfg, exemplar interface{}) (Queue
 				}
 			}
 			wrapped.tlock.Unlock()
-			handle(datum)
+			if u := handle(datum); u != nil {
+				unhandled = append(unhandled, u...)
+			}
 		}
+		return unhandled
 	}
 	_ = GetManager().Add(queue, WrappedUniqueQueueType, config, exemplar)
 	return wrapped, nil
diff --git a/modules/queue/workerpool.go b/modules/queue/workerpool.go
index 653d0558c8..da56216dcb 100644
--- a/modules/queue/workerpool.go
+++ b/modules/queue/workerpool.go
@@ -22,6 +22,8 @@ type WorkerPool struct {
 	lock               sync.Mutex
 	baseCtx            context.Context
 	baseCtxCancel      context.CancelFunc
+	paused             chan struct{}
+	resumed            chan struct{}
 	cond               *sync.Cond
 	qid                int64
 	maxNumberOfWorkers int
@@ -35,6 +37,11 @@ type WorkerPool struct {
 	numInQueue         int64
 }
 
+var (
+	_ Flushable   = &WorkerPool{}
+	_ ManagedPool = &WorkerPool{}
+)
+
 // WorkerPoolConfiguration is the basic configuration for a WorkerPool
 type WorkerPoolConfiguration struct {
 	QueueLength  int
@@ -50,11 +57,15 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
 	ctx, cancel := context.WithCancel(context.Background())
 
 	dataChan := make(chan Data, config.QueueLength)
+	resumed := make(chan struct{})
+	close(resumed)
 	pool := &WorkerPool{
 		baseCtx:            ctx,
 		baseCtxCancel:      cancel,
 		batchLength:        config.BatchLength,
 		dataChan:           dataChan,
+		resumed:            resumed,
+		paused:             make(chan struct{}),
 		handle:             handle,
 		blockTimeout:       config.BlockTimeout,
 		boostTimeout:       config.BoostTimeout,
@@ -69,6 +80,14 @@ func NewWorkerPool(handle HandlerFunc, config WorkerPoolConfiguration) *WorkerPo
 func (p *WorkerPool) Push(data Data) {
 	atomic.AddInt64(&p.numInQueue, 1)
 	p.lock.Lock()
+	select {
+	case <-p.paused:
+		p.lock.Unlock()
+		p.dataChan <- data
+		return
+	default:
+	}
+
 	if p.blockTimeout > 0 && p.boostTimeout > 0 && (p.numberOfWorkers <= p.maxNumberOfWorkers || p.maxNumberOfWorkers < 0) {
 		if p.numberOfWorkers == 0 {
 			p.zeroBoost()
@@ -82,6 +101,17 @@ func (p *WorkerPool) Push(data Data) {
 	}
 }
 
+// HasNoWorkerScaling will return true if the queue has no workers, and has no worker boosting
+func (p *WorkerPool) HasNoWorkerScaling() bool {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+	return p.hasNoWorkerScaling()
+}
+
+func (p *WorkerPool) hasNoWorkerScaling() bool {
+	return p.numberOfWorkers == 0 && (p.boostTimeout == 0 || p.boostWorkers == 0 || p.maxNumberOfWorkers == 0)
+}
+
 func (p *WorkerPool) zeroBoost() {
 	ctx, cancel := context.WithTimeout(p.baseCtx, p.boostTimeout)
 	mq := GetManager().GetManagedQueue(p.qid)
@@ -272,6 +302,12 @@ func (p *WorkerPool) addWorkers(ctx context.Context, cancel context.CancelFunc,
 				p.cond.Broadcast()
 				cancel()
 			}
+			if p.hasNoWorkerScaling() {
+				log.Warn(
+					"Queue: %d is configured to be non-scaling and has no workers - this configuration is likely incorrect.\n"+
+						"The queue will be paused to prevent data-loss with the assumption that you will add workers and unpause as required.", p.qid)
+				p.pause()
+			}
 			p.lock.Unlock()
 		}()
 	}
@@ -290,13 +326,65 @@ func (p *WorkerPool) Wait() {
 	p.cond.Wait()
 }
 
+// IsPaused returns if the pool is paused
+func (p *WorkerPool) IsPaused() bool {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+	select {
+	case <-p.paused:
+		return true
+	default:
+		return false
+	}
+}
+
+// IsPausedIsResumed returns if the pool is paused and a channel that is closed when it is resumed
+func (p *WorkerPool) IsPausedIsResumed() (<-chan struct{}, <-chan struct{}) {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+	return p.paused, p.resumed
+}
+
+// Pause pauses the WorkerPool
+func (p *WorkerPool) Pause() {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+	p.pause()
+}
+
+func (p *WorkerPool) pause() {
+	select {
+	case <-p.paused:
+	default:
+		p.resumed = make(chan struct{})
+		close(p.paused)
+	}
+}
+
+// Resume resumes the WorkerPool
+func (p *WorkerPool) Resume() {
+	p.lock.Lock()
+	defer p.lock.Unlock()
+	select {
+	case <-p.resumed:
+	default:
+		p.paused = make(chan struct{})
+		close(p.resumed)
+	}
+}
+
 // CleanUp will drain the remaining contents of the channel
 // This should be called after AddWorkers context is closed
 func (p *WorkerPool) CleanUp(ctx context.Context) {
 	log.Trace("WorkerPool: %d CleanUp", p.qid)
 	close(p.dataChan)
 	for data := range p.dataChan {
-		p.handle(data)
+		if unhandled := p.handle(data); unhandled != nil {
+			if unhandled != nil {
+				log.Error("Unhandled Data in clean-up of queue %d", p.qid)
+			}
+		}
+
 		atomic.AddInt64(&p.numInQueue, -1)
 		select {
 		case <-ctx.Done():
@@ -327,7 +415,9 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
 	for {
 		select {
 		case data := <-p.dataChan:
-			p.handle(data)
+			if unhandled := p.handle(data); unhandled != nil {
+				log.Error("Unhandled Data whilst flushing queue %d", p.qid)
+			}
 			atomic.AddInt64(&p.numInQueue, -1)
 		case <-p.baseCtx.Done():
 			return p.baseCtx.Err()
@@ -341,13 +431,45 @@ func (p *WorkerPool) FlushWithContext(ctx context.Context) error {
 
 func (p *WorkerPool) doWork(ctx context.Context) {
 	delay := time.Millisecond * 300
+
+	// Create a common timer - we will use this elsewhere
+	timer := time.NewTimer(0)
+	util.StopTimer(timer)
+
+	paused, _ := p.IsPausedIsResumed()
 	data := make([]Data, 0, p.batchLength)
 	for {
 		select {
+		case <-paused:
+			log.Trace("Worker for Queue %d Pausing", p.qid)
+			if len(data) > 0 {
+				log.Trace("Handling: %d data, %v", len(data), data)
+				if unhandled := p.handle(data...); unhandled != nil {
+					log.Error("Unhandled Data in queue %d", p.qid)
+				}
+				atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
+			}
+			_, resumed := p.IsPausedIsResumed()
+			select {
+			case <-resumed:
+				paused, _ = p.IsPausedIsResumed()
+				log.Trace("Worker for Queue %d Resuming", p.qid)
+				util.StopTimer(timer)
+			case <-ctx.Done():
+				log.Trace("Worker shutting down")
+				return
+			}
+		default:
+		}
+		select {
+		case <-paused:
+			// go back around
 		case <-ctx.Done():
 			if len(data) > 0 {
 				log.Trace("Handling: %d data, %v", len(data), data)
-				p.handle(data...)
+				if unhandled := p.handle(data...); unhandled != nil {
+					log.Error("Unhandled Data in queue %d", p.qid)
+				}
 				atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
 			}
 			log.Trace("Worker shutting down")
@@ -357,59 +479,36 @@ func (p *WorkerPool) doWork(ctx context.Context) {
 				// the dataChan has been closed - we should finish up:
 				if len(data) > 0 {
 					log.Trace("Handling: %d data, %v", len(data), data)
-					p.handle(data...)
+					if unhandled := p.handle(data...); unhandled != nil {
+						log.Error("Unhandled Data in queue %d", p.qid)
+					}
 					atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
 				}
 				log.Trace("Worker shutting down")
 				return
 			}
 			data = append(data, datum)
+			util.StopTimer(timer)
+
 			if len(data) >= p.batchLength {
 				log.Trace("Handling: %d data, %v", len(data), data)
-				p.handle(data...)
+				if unhandled := p.handle(data...); unhandled != nil {
+					log.Error("Unhandled Data in queue %d", p.qid)
+				}
 				atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
 				data = make([]Data, 0, p.batchLength)
+			} else {
+				timer.Reset(delay)
 			}
-		default:
-			timer := time.NewTimer(delay)
-			select {
-			case <-ctx.Done():
-				util.StopTimer(timer)
-				if len(data) > 0 {
-					log.Trace("Handling: %d data, %v", len(data), data)
-					p.handle(data...)
-					atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
+		case <-timer.C:
+			delay = time.Millisecond * 100
+			if len(data) > 0 {
+				log.Trace("Handling: %d data, %v", len(data), data)
+				if unhandled := p.handle(data...); unhandled != nil {
+					log.Error("Unhandled Data in queue %d", p.qid)
 				}
-				log.Trace("Worker shutting down")
-				return
-			case datum, ok := <-p.dataChan:
-				util.StopTimer(timer)
-				if !ok {
-					// the dataChan has been closed - we should finish up:
-					if len(data) > 0 {
-						log.Trace("Handling: %d data, %v", len(data), data)
-						p.handle(data...)
-						atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
-					}
-					log.Trace("Worker shutting down")
-					return
-				}
-				data = append(data, datum)
-				if len(data) >= p.batchLength {
-					log.Trace("Handling: %d data, %v", len(data), data)
-					p.handle(data...)
-					atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
-					data = make([]Data, 0, p.batchLength)
-				}
-			case <-timer.C:
-				delay = time.Millisecond * 100
-				if len(data) > 0 {
-					log.Trace("Handling: %d data, %v", len(data), data)
-					p.handle(data...)
-					atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
-					data = make([]Data, 0, p.batchLength)
-				}
-
+				atomic.AddInt64(&p.numInQueue, -1*int64(len(data)))
+				data = make([]Data, 0, p.batchLength)
 			}
 		}
 	}
diff --git a/options/locale/locale_en-US.ini b/options/locale/locale_en-US.ini
index 301bd4f66e..8ee7347c0d 100644
--- a/options/locale/locale_en-US.ini
+++ b/options/locale/locale_en-US.ini
@@ -2803,6 +2803,12 @@ monitor.queue.pool.flush.title = Flush Queue
 monitor.queue.pool.flush.desc = Flush will add a worker that will terminate once the queue is empty, or it times out.
 monitor.queue.pool.flush.submit = Add Flush Worker
 monitor.queue.pool.flush.added = Flush Worker added for %[1]s
+monitor.queue.pool.pause.title = Pause Queue
+monitor.queue.pool.pause.desc = Pausing a Queue will prevent it from processing data
+monitor.queue.pool.pause.submit = Pause Queue
+monitor.queue.pool.resume.title = Resume Queue
+monitor.queue.pool.resume.desc = Set this queue to resume work
+monitor.queue.pool.resume.submit = Resume Queue
 
 monitor.queue.settings.title = Pool Settings
 monitor.queue.settings.desc = Pools dynamically grow with a boost in response to their worker queue blocking. These changes will not affect current worker groups.
diff --git a/routers/web/admin/admin.go b/routers/web/admin/admin.go
index 276e1939ad..fac3ef9622 100644
--- a/routers/web/admin/admin.go
+++ b/routers/web/admin/admin.go
@@ -394,6 +394,30 @@ func Flush(ctx *context.Context) {
 	ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
 }
 
+// Pause pauses a queue
+func Pause(ctx *context.Context) {
+	qid := ctx.ParamsInt64("qid")
+	mq := queue.GetManager().GetManagedQueue(qid)
+	if mq == nil {
+		ctx.Status(404)
+		return
+	}
+	mq.Pause()
+	ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
+}
+
+// Resume resumes a queue
+func Resume(ctx *context.Context) {
+	qid := ctx.ParamsInt64("qid")
+	mq := queue.GetManager().GetManagedQueue(qid)
+	if mq == nil {
+		ctx.Status(404)
+		return
+	}
+	mq.Resume()
+	ctx.Redirect(setting.AppSubURL + "/admin/monitor/queue/" + strconv.FormatInt(qid, 10))
+}
+
 // AddWorkers adds workers to a worker group
 func AddWorkers(ctx *context.Context) {
 	qid := ctx.ParamsInt64("qid")
diff --git a/routers/web/web.go b/routers/web/web.go
index 6415788e44..4c50229906 100644
--- a/routers/web/web.go
+++ b/routers/web/web.go
@@ -402,6 +402,8 @@ func RegisterRoutes(m *web.Route) {
 				m.Post("/add", admin.AddWorkers)
 				m.Post("/cancel/{pid}", admin.WorkerCancel)
 				m.Post("/flush", admin.Flush)
+				m.Post("/pause", admin.Pause)
+				m.Post("/resume", admin.Resume)
 			})
 		})
 
diff --git a/services/mailer/mailer.go b/services/mailer/mailer.go
index eeb98b5879..3ca9b50fc6 100644
--- a/services/mailer/mailer.go
+++ b/services/mailer/mailer.go
@@ -346,7 +346,7 @@ func NewContext() {
 		Sender = &dummySender{}
 	}
 
-	mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) {
+	mailQueue = queue.CreateQueue("mail", func(data ...queue.Data) []queue.Data {
 		for _, datum := range data {
 			msg := datum.(*Message)
 			gomailMsg := msg.ToMessage()
@@ -357,6 +357,7 @@ func NewContext() {
 				log.Trace("E-mails sent %s: %s", gomailMsg.GetHeader("To"), msg.Info)
 			}
 		}
+		return nil
 	}, &Message{})
 
 	go graceful.GetManager().RunWithShutdownFns(mailQueue.Run)
diff --git a/services/mirror/mirror.go b/services/mirror/mirror.go
index 2643200174..6f285ec467 100644
--- a/services/mirror/mirror.go
+++ b/services/mirror/mirror.go
@@ -130,11 +130,12 @@ func Update(ctx context.Context, pullLimit, pushLimit int) error {
 	return nil
 }
 
-func queueHandle(data ...queue.Data) {
+func queueHandle(data ...queue.Data) []queue.Data {
 	for _, datum := range data {
 		req := datum.(*SyncRequest)
 		doMirrorSync(graceful.GetManager().ShutdownContext(), req)
 	}
+	return nil
 }
 
 // InitSyncMirrors initializes a go routine to sync the mirrors
diff --git a/services/pull/check.go b/services/pull/check.go
index 3615c6c654..2203fb8749 100644
--- a/services/pull/check.go
+++ b/services/pull/check.go
@@ -216,12 +216,13 @@ func InitializePullRequests(ctx context.Context) {
 }
 
 // handle passed PR IDs and test the PRs
-func handle(data ...queue.Data) {
+func handle(data ...queue.Data) []queue.Data {
 	for _, datum := range data {
 		id, _ := strconv.ParseInt(datum.(string), 10, 64)
 
 		testPR(id)
 	}
+	return nil
 }
 
 func testPR(id int64) {
diff --git a/services/pull/check_test.go b/services/pull/check_test.go
index f0ec096ea9..4cdd17cc7b 100644
--- a/services/pull/check_test.go
+++ b/services/pull/check_test.go
@@ -22,11 +22,12 @@ func TestPullRequest_AddToTaskQueue(t *testing.T) {
 
 	idChan := make(chan int64, 10)
 
-	q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) {
+	q, err := queue.NewChannelUniqueQueue(func(data ...queue.Data) []queue.Data {
 		for _, datum := range data {
 			id, _ := strconv.ParseInt(datum.(string), 10, 64)
 			idChan <- id
 		}
+		return nil
 	}, queue.ChannelUniqueQueueConfiguration{
 		WorkerPoolConfiguration: queue.WorkerPoolConfiguration{
 			QueueLength: 10,
diff --git a/services/repository/archiver/archiver.go b/services/repository/archiver/archiver.go
index f982e2ef7b..ad2141ef33 100644
--- a/services/repository/archiver/archiver.go
+++ b/services/repository/archiver/archiver.go
@@ -246,7 +246,7 @@ var archiverQueue queue.UniqueQueue
 
 // Init initlize archive
 func Init() error {
-	handler := func(data ...queue.Data) {
+	handler := func(data ...queue.Data) []queue.Data {
 		for _, datum := range data {
 			archiveReq, ok := datum.(*ArchiveRequest)
 			if !ok {
@@ -258,6 +258,7 @@ func Init() error {
 				log.Error("Archive %v failed: %v", datum, err)
 			}
 		}
+		return nil
 	}
 
 	archiverQueue = queue.CreateUniqueQueue("repo-archive", handler, new(ArchiveRequest))
diff --git a/services/repository/push.go b/services/repository/push.go
index 518ad04157..fafe4736ab 100644
--- a/services/repository/push.go
+++ b/services/repository/push.go
@@ -33,13 +33,14 @@ import (
 var pushQueue queue.Queue
 
 // handle passed PR IDs and test the PRs
-func handle(data ...queue.Data) {
+func handle(data ...queue.Data) []queue.Data {
 	for _, datum := range data {
 		opts := datum.([]*repo_module.PushUpdateOptions)
 		if err := pushUpdates(opts); err != nil {
 			log.Error("pushUpdate failed: %v", err)
 		}
 	}
+	return nil
 }
 
 func initPushQueue() error {
diff --git a/services/task/task.go b/services/task/task.go
index 376fe1dce1..3f823fc224 100644
--- a/services/task/task.go
+++ b/services/task/task.go
@@ -49,13 +49,14 @@ func Init() error {
 	return nil
 }
 
-func handle(data ...queue.Data) {
+func handle(data ...queue.Data) []queue.Data {
 	for _, datum := range data {
 		task := datum.(*models.Task)
 		if err := Run(task); err != nil {
 			log.Error("Run task failed: %v", err)
 		}
 	}
+	return nil
 }
 
 // MigrateRepository add migration repository to task
diff --git a/templates/admin/queue.tmpl b/templates/admin/queue.tmpl
index 3d9cc95592..d2d2c83baf 100644
--- a/templates/admin/queue.tmpl
+++ b/templates/admin/queue.tmpl
@@ -92,6 +92,35 @@
 				</div>
 			</form>
 		</div>
+		{{if .Queue.Pausable}}
+			{{if .Queue.IsPaused}}
+				<h4 class="ui top attached header">
+					{{.i18n.Tr "admin.monitor.queue.pool.resume.title"}}
+				</h4>
+				<div class="ui attached segment">
+					<p>{{.i18n.Tr "admin.monitor.queue.pool.resume.desc"}}</p>
+					<form method="POST" action="{{.Link}}/resume">
+						{{$.CsrfTokenHtml}}
+						<div class="ui form">
+							<button class="ui submit button">{{.i18n.Tr "admin.monitor.queue.pool.resume.submit"}}</button>
+						</div>
+					</form>
+				</div>
+			{{else}}
+				<h4 class="ui top attached header">
+					{{.i18n.Tr "admin.monitor.queue.pool.pause.title"}}
+				</h4>
+				<div class="ui attached segment">
+					<p>{{.i18n.Tr "admin.monitor.queue.pool.pause.desc"}}</p>
+					<form method="POST" action="{{.Link}}/pause">
+						{{$.CsrfTokenHtml}}
+						<div class="ui form">
+							<button class="ui submit button">{{.i18n.Tr "admin.monitor.queue.pool.pause.submit"}}</button>
+						</div>
+					</form>
+				</div>
+			{{end}}
+		{{end}}
 		<h4 class="ui top attached header">
 			{{.i18n.Tr "admin.monitor.queue.pool.flush.title"}}
 		</h4>