Public Functions

Summary
Creates new queue with given name.
Drop queue and all associated tables.
Insert a event into queue.
Insert a event into queue with all the extra fields.
Return active event table for particular queue.
Subscribe consumer on a queue.
Extended registration, allows to specify tick_id.
Unsubscriber consumer from the queue.
Makes next block of events active.
Get all events in batch.
Copies the event to failed queue so it can be looked at later.
Put the event into retry queue, to be processed again later.
Put the event into retry queue, to be processed later again.
Closes a batch.
Get info about all queues.
Get info about particular queue.
Returns info about all consumers on all queues.
Returns info about consumers on one particular queue.
Get info about particular consumer on particular queue.
Returns verison string for pgq.
Returns detailed info about a batch.
Get list of all failed events for one consumer.
Get list of failed events, from offset and specific count.
Get size of failed event queue.
Delete specific event from failed event queue.
Insert specific event from failed queue to main queue.

Queue creation

pgq. create_queue(1)

pgq.create_queue(i_queue_name text) returns integer

Creates new queue with given name.

Returns

0queue already exists
1queue created

pgq. drop_queue(1)

pgq.drop_queue(x_queue_name text) returns integer

Drop queue and all associated tables.  No consumers must be listening on the queue.

Event publishing

pgq. insert_event(3)

pgq.insert_event(queue_name text,
ev_type text,
ev_data text) returns bigint

Insert a event into queue.

Parameters

queue_nameName of the queue
ev_typeUser-specified type for the event
ev_dataUser data for the event

Returns

Event ID

pgq. insert_event(7)

pgq.insert_event(queue_name text,
ev_type text,
ev_data text,
ev_extra1 text,
ev_extra2 text,
ev_extra3 text,
ev_extra4 text) returns bigint

Insert a event into queue with all the extra fields.

Parameters

queue_nameName of the queue
ev_typeUser-specified type for the event
ev_dataUser data for the event
ev_extra1Extra data field for the event
ev_extra2Extra data field for the event
ev_extra3Extra data field for the event
ev_extra4Extra data field for the event

Returns

Event ID

pgq. current_event_table(1)

pgq.current_event_table(x_queue_name text) returns text

Return active event table for particular queue.  Event can be added to it without going via functions, e.g. by COPY.

Note

The result is valid only during current transaction.

Permissions

Actual insertion requires superuser access.

Parameters

x_queue_nameQueue name.

Subscribing to queue

pgq. register_consumer(2)

pgq.register_consumer(x_queue_name text,
x_consumer_id text) returns integer

Subscribe consumer on a queue.

From this moment forward, consumer will see all events in the queue.

Parameters

x_queue_nameName of queue
x_consumer_nameName of consumer

Returns

0if already registered
1if new registration

pgq. register_consumer(3)

pgq.register_consumer(x_queue_name text,
x_consumer_name text,
x_tick_pos bigint) returns integer

Extended registration, allows to specify tick_id.

Note

For usage in special situations.

Parameters

x_queue_nameName of a queue
x_consumer_nameName of consumer
x_tick_posTick ID

Returns

0/1 whether consumer has already registered.

pgq. unregister_consumer(2)

pgq.unregister_consumer(x_queue_name text,
x_consumer_name text) returns integer

Unsubscriber consumer from the queue.  Also consumer’s failed and retry events are deleted.

Parameters

x_queue_nameName of the queue
x_consumer_nameName of the consumer

Returns

nothing

Batch processing

pgq. next_batch(2)

pgq.next_batch(x_queue_name text,
x_consumer_name text) returns bigint

Makes next block of events active.

If it returns NULL, there is no events available in queue.  Consumer should sleep a bith then.

Parameters

x_queue_nameName of the queue
x_consumer_nameName of the consumer

Returns

Batch ID or NULL if there are no more events available.

pgq. get_batch_events(1)

pgq.get_batch_events(x_batch_id bigint) returns setof pgq.ret_batch_event

Get all events in batch.

Parameters

x_batch_idID of active batch.

Returns

List of events.

pgq. event_failed(3)

pgq.event_failed(x_batch_id bigint,
x_event_id bigint,
x_reason text) returns integer

Copies the event to failed queue so it can be looked at later.

Parameters

x_batch_idID of active batch.
x_event_idEvent id
x_reasonText to associate with event.

Returns

0 if event was already in queue, 1 otherwise.

pgq. event_retry(3)

pgq.event_retry(x_batch_id bigint,
x_event_id bigint,
x_retry_time timestamptz) returns integer

Put the event into retry queue, to be processed again later.

Parameters

x_batch_idID of active batch.
x_event_idevent id
x_retry_timeTime when the event should be put back into queue

Returns

nothing

pgq. event_retry(3)

pgq.event_retry(x_batch_id bigint,
x_event_id bigint,
x_retry_seconds integer) returns integer

Put the event into retry queue, to be processed later again.

Parameters

x_batch_idID of active batch.
x_event_idevent id
x_retry_secondsTime when the event should be put back into queue

Returns

nothing

pgq. finish_batch(1)

pgq.finish_batch(x_batch_id bigint) returns integer

Closes a batch.  No more operations can be done with events of this batch.

Parameters

x_batch_idid of batch.

Returns

If batch 1 if batch was found, 0 otherwise.

General info functions

pgq. get_queue_info(0)

pgq.get_queue_info() returns setof pgq.ret_queue_info

Get info about all queues.

Returns

List of pgq.ret_queue_info records.

pgq. get_queue_info(1)

pgq.get_queue_info(qname text) returns pgq.ret_queue_info

Get info about particular queue.

Returns

One pgq.ret_queue_info record.

pgq. get_consumer_info(0)

pgq.get_consumer_info() returns setof pgq.ret_consumer_info

Returns info about all consumers on all queues.

Returns

See pgq.get_consumer_info(2)

pgq. get_consumer_info(1)

pgq.get_consumer_info(x_queue_name text) returns setof pgq.ret_consumer_info

Returns info about consumers on one particular queue.

Parameters

x_queue_nameQueue name

Returns

See pgq.get_consumer_info(2)

pgq. get_consumer_info(2)

pgq.get_consumer_info(x_queue_name text,
x_consumer_name text) returns setof pgq.ret_consumer_info

Get info about particular consumer on particular queue.

Parameters

x_queue_namename of a queue.
x_consumer_namename of a consumer

Returns

queue_nameQueue name
consumer_nameConsumer name
lagHow old are events the consumer is processing
last_seenWhen the consumer seen by pgq
last_tickTick ID of last processed tick
current_batchCurrent batch ID, if one is active or NULL
next_tickIf batch is active, then its final tick.

pgq. version(0)

pgq.version() returns text

Returns verison string for pgq.  ATM its SkyTools version that is only bumped when PGQ database code changes.

pgq. get_batch_info(1)

pgq.get_batch_info(x_batch_id bigint) returns pgq.ret_batch_info

Returns detailed info about a batch.

Parameters

x_batch_idid of a active batch.

Returns

Info

Failed queue browsing

pgq. failed_event_list(2)

pgq.failed_event_list(x_queue_name text,
x_consumer_name text) returns setof pgq.failed_queue

Get list of all failed events for one consumer.

Parameters

x_queue_nameQueue name
x_consumer_nameConsumer name

Returns

List of failed events.

pgq. failed_event_list(4)

pgq.failed_event_list(x_queue_name text,
x_consumer_name text,
x_count integer,
x_offset integer) returns setof pgq.failed_queue

Get list of failed events, from offset and specific count.

Parameters

x_queue_nameQueue name
x_consumer_nameConsumer name
x_countMax amount of events to fetch
x_offsetFrom this offset

Returns

List of failed events.

pgq. failed_event_count(2)

pgq.failed_event_count(x_queue_name text,
x_consumer_name text) returns integer

Get size of failed event queue.

Parameters

x_queue_nameQueue name
x_consumer_nameConsumer name

Returns

Number of failed events in failed event queue.

pgq. failed_event_delete(3)

pgq.failed_event_delete(x_queue_name text,
x_consumer_name text,
x_event_id bigint) returns integer

Delete specific event from failed event queue.

Parameters

x_queue_nameQueue name
x_consumer_nameConsumer name
x_event_idEvent ID

Returns

nothing

pgq. failed_event_retry(3)

pgq.failed_event_retry(x_queue_name text,
x_consumer_name text,
x_event_id bigint) returns bigint

Insert specific event from failed queue to main queue.

Parameters

x_queue_nameQueue name
x_consumer_nameConsumer name
x_event_idEvent ID

Returns

nothing

pgq.create_queue(i_queue_name text) returns integer
Creates new queue with given name.
pgq.drop_queue(x_queue_name text) returns integer
Drop queue and all associated tables.
pgq.insert_event(queue_name text,
ev_type text,
ev_data text) returns bigint
Insert a event into queue.
pgq.insert_event(queue_name text,
ev_type text,
ev_data text,
ev_extra1 text,
ev_extra2 text,
ev_extra3 text,
ev_extra4 text) returns bigint
Insert a event into queue with all the extra fields.
pgq.current_event_table(x_queue_name text) returns text
Return active event table for particular queue.
pgq.register_consumer(x_queue_name text,
x_consumer_id text) returns integer
Subscribe consumer on a queue.
pgq.register_consumer(x_queue_name text,
x_consumer_name text,
x_tick_pos bigint) returns integer
Extended registration, allows to specify tick_id.
pgq.unregister_consumer(x_queue_name text,
x_consumer_name text) returns integer
Unsubscriber consumer from the queue.
pgq.next_batch(x_queue_name text,
x_consumer_name text) returns bigint
Makes next block of events active.
pgq.get_batch_events(x_batch_id bigint) returns setof pgq.ret_batch_event
Get all events in batch.
pgq.event_failed(x_batch_id bigint,
x_event_id bigint,
x_reason text) returns integer
Copies the event to failed queue so it can be looked at later.
pgq.event_retry(x_batch_id bigint,
x_event_id bigint,
x_retry_time timestamptz) returns integer
Put the event into retry queue, to be processed again later.
pgq.finish_batch(x_batch_id bigint) returns integer
Closes a batch.
pgq.get_queue_info() returns setof pgq.ret_queue_info
Get info about all queues.
pgq.get_queue_info(qname text) returns pgq.ret_queue_info
Get info about particular queue.
pgq.get_consumer_info() returns setof pgq.ret_consumer_info
Returns info about all consumers on all queues.
pgq.get_consumer_info(x_queue_name text) returns setof pgq.ret_consumer_info
Returns info about consumers on one particular queue.
pgq.get_consumer_info(x_queue_name text,
x_consumer_name text) returns setof pgq.ret_consumer_info
Get info about particular consumer on particular queue.
pgq.version() returns text
Returns verison string for pgq.
pgq.get_batch_info(x_batch_id bigint) returns pgq.ret_batch_info
Returns detailed info about a batch.
pgq.failed_event_list(x_queue_name text,
x_consumer_name text) returns setof pgq.failed_queue
Get list of all failed events for one consumer.
pgq.failed_event_list(x_queue_name text,
x_consumer_name text,
x_count integer,
x_offset integer) returns setof pgq.failed_queue
Get list of failed events, from offset and specific count.
pgq.failed_event_count(x_queue_name text,
x_consumer_name text) returns integer
Get size of failed event queue.
pgq.failed_event_delete(x_queue_name text,
x_consumer_name text,
x_event_id bigint) returns integer
Delete specific event from failed event queue.
pgq.failed_event_retry(x_queue_name text,
x_consumer_name text,
x_event_id bigint) returns bigint
Insert specific event from failed queue to main queue.