Queue API

Basic Definitions:

  • ttl - Time to Live of task.
  • ttr - Time to Release of task.
  • pri - Priority of task.
  • delay - Delay for task to be added to queue.

Warning

Don’t use constructor of Task and Tube. Task’s are created by Tube and Queue methods. For creating Tube object use Queue.tube(name)

class tarantool_queue.Queue(host='localhost', port=33013, space=0, schema=None)[source]

Tarantool queue wrapper. Surely pinned to space. May create tubes. By default it uses msgpack for serialization, but you may redefine serialize and deserialize methods. You must use Queue only for creating Tubes. For more usage, please, look into tests. Usage:

>>> from tntqueue import Queue
>>> queue = Queue()
>>> tube1 = queue.create_tube('holy_grail', ttl=100, delay=5)
# Put task into the queue
>>> tube1.put([1, 2, 3])
# Put task into the beggining of queue (Highest priority)
>>> tube1.urgent([2, 3, 4])
>>> tube1.get() # We get task and automaticaly release it
>>> task1 = tube1.take()
>>> task2 = tube1.take()
>>> print(task1.data)
    [2, 3, 4]
>>> print(task2.data)
    [1, 2, 3]
>>> del task2
>>> del task1
>>> print(tube1.take().data)
    [1, 2, 3]
# Take task and Ack it
>>> tube1.take().ack()
    True
DataBaseError

alias of DatabaseError

exception NetworkError(orig_exception=None, *args)

Error related to network

Queue.deserialize

Deserialize function: must be Callable. If sets to None or delete, then it will use msgpack for deserializing.

Queue.peek(task_id)[source]

Return a task by task id.

Parameters:task_id (string) – UUID of task in HEX
Return type:Task instance
Queue.serialize

Serialize function: must be Callable. If sets to None or deleted, then it will use msgpack for serializing.

Queue.statistics(tube=None)[source]

Return queue module statistics accumulated since server start. Output format: if tube != None, then output is dictionary with stats of current tube. If tube is None, then output is dict of t stats, ...} e.g.:

>>> tube.statistics()
# or queue.statistics('tube0')
# or queue.statistics(tube.opt['tube'])
{'ack': '233',
'meta': '35',
'put': '153',
'release': '198',
'take': '431',
'take_timeout': '320',
'tasks': {'buried': '0',
        'delayed': '0',
        'done': '0',
        'ready': '0',
        'taken': '0',
        'total': '0'},
'urgent': '80'}
or
>>> queue.statistics()
{'tube0': {'ack': '233',
        'meta': '35',
        'put': '153',
        'release': '198',
        'take': '431',
        'take_timeout': '320',
        'tasks': {'buried': '0',
                'delayed': '0',
                'done': '0',
                'ready': '0',
                'taken': '0',
                'total': '0'},
        'urgent': '80'}}
Parameters:tube (string or None) – Name of tube
Return type:dict with statistics
Queue.tarantool_connection

Tarantool Connection class: must be class with methods call and __init__. If it sets to None or deleted - it will use the default tarantool.Connection class for connection.

Queue.tarantool_lock

Locking class: must be locking instance with methods __enter__ and __exit__. If it sets to None or delete - it will use default threading.Lock() instance for locking in the connecting.

Queue.tube(name, **kwargs)[source]

Create Tube object, if not created before, and set kwargs. If existed, return existed Tube.

Parameters:
  • name (string) – name of Tube
  • delay (int) – default delay for Tube tasks (Not necessary, will be 0)
  • ttl (int) – default TTL for Tube tasks (Not necessary, will be 0)
  • ttr (int) – default TTR for Tube tasks (Not necessary, will be 0)
  • pri (int) – default priority for Tube tasks (Not necessary)
Return type:

Tube instance