Queue creation

pgq.create_queue(queue_name text)

Initialize event queue.

Returns 0 if event queue already exists, 1 otherwise.

Producer

pgq.insert_event(queue_name text, ev_type, ev_data)
pgq.insert_event(queue_name text, ev_type, ev_data, extra1, extra2, extra3, extra4)

Generate new event. This should be called inside main tx - thus rollbacked with it if needed.

Consumer

pgq.register_consumer(queue_name text, consumer_id text)

Attaches this consumer to particular event queue.

Returns 0 if the consumer was already attached, 1 otherwise.

pgq.unregister_consumer(queue_name text, consumer_id text)

Unregister and drop resources allocated to customer.

pgq.next_batch(queue_name text, consumer_id text)

Allocates next batch of events to consumer.

Returns batch id (int8), to be used in processing functions. If no batches are available, returns NULL. That means that the ticker has not cut them yet. This is the appropriate moment for consumer to sleep.

pgq.get_batch_events(batch_id int8)

pgq.get_batch_events() returns a set of events in this batch.

There may be no events in the batch. This is normal. The batch must still be closed with pgq.finish_batch().

Event fields: (ev_id int8, ev_time timestamptz, ev_txid int8, ev_retry int4, ev_type text, ev_data text, ev_extra1, ev_extra2, ev_extra3, ev_extra4)

pgq.event_failed(batch_id int8, event_id int8, reason text)

Tag event as failed - it will be stored, but not further processing is done.

pgq.event_retry(batch_id int8, event_id int8, retry_seconds int4)

Tag event for retry - after x seconds the event will be re-inserted into main queue.

pgq.finish_batch(batch_id int8)

Tag batch as finished. Until this is not done, the consumer will get same batch again.

After calling finish_batch consumer cannot do any operations with events of that batch. All operations must be done before.

Failed queue operation

Events tagged as failed just stay on their queue. Following functions can be used to manage them.

pgq.failed_event_list(queue_name, consumer)
pgq.failed_event_list(queue_name, consumer, cnt, offset)
pgq.failed_event_count(queue_name, consumer)

Get info about the queue.

Event fields are same as for pgq.get_batch_events()

pgq.failed_event_delete(queue_name, consumer, event_id)
pgq.failed_event_retry(queue_name, consumer, event_id)

Remove an event from queue, or retry it.

Info operations

pgq.get_queue_info()

Get list of queues.

Result: (queue_name, queue_ntables, queue_cur_table, queue_rotation_period, queue_switch_time, queue_external_ticker, queue_ticker_max_count, queue_ticker_max_lag, queue_ticker_idle_period, ticker_lag)

pgq.get_consumer_info()
pgq.get_consumer_info(queue_name)
pgq.get_consumer_info(queue_name, consumer)

Get list of active consumers.

Result: (queue_name, consumer_name, lag, last_seen, last_tick, current_batch, next_tick)

pgq.get_batch_info(batch_id)

Get info about batch.

Result fields: (queue_name, consumer_name, batch_start, batch_end, prev_tick_id, tick_id, lag)

Notes

Consumer must be able to process same event several times.

Example

First, create event queue:

select pgq.create_queue('LogEvent');

Then, producer side can do whenever it wishes:

select pgq.insert_event('LogEvent', 'data', 'DataFor123');

First step for consumer is to register:

select pgq.register_consumer('LogEvent', 'TestConsumer');

Then it can enter into consuming loop:

begin;
select pgq.next_batch('LogEvent', 'TestConsumer'); [into batch_id]
commit;

That will reserve a batch of events for this consumer.

To see the events in batch:

select * from pgq.get_batch_events(batch_id);

That will give all events in batch. The processing does not need to be happen all in one transaction, framework can split the work how it wants.

If a events failed or needs to be tried again, framework can call:

select pgq.event_retry(batch_id, event_id, 60);
select pgq.event_failed(batch_id, event_id, 'Record deleted');

When all done, notify database about it:

select pgq.finish_batch(batch_id)