API Documentation

Continuation based API

This module is for callbacks style of asynchronous application development.

We depend on the pika library for lower level communication and event loop.

class eventail.async_service.pika.Service(amqp_urls: List[str], event_routing_keys: Sequence[str], command_routing_keys: Sequence[str], logical_service: str, config_routing_keys: Sequence[str] = [])

This is an example service that will handle unexpected interactions with RabbitMQ such as channel and connection closures.

If RabbitMQ closes the connection, this class will stop and indicate that reconnection is necessary. You should look at the output, as there are limited reasons why the connection may be closed, which usually are tied to permission related issues or socket timeouts.

If the channel is closed, it will indicate a problem with one of the commands that were issued and that should surface in the output as well.

To leverage the binary nature of AMQP messages, we use CBOR instead of JSON as data serialization (transparent). Moreover, CBOR is much faster and much more compact than JSON.

BLOCKED_TIMEOUT = 3600

When rabbitmq is low on resources, it may temporarily block the connection. We can specify a timeout if it is not acceptable to the service (in seconds)

HEARTBEAT = 60

Heartbeat interval, must be superior to the expected blocking processing time (in seconds). Beware that the actual delay is negotiated with the broker, and the lower value is taken, so configure Rabbitmq accordingly.

PREFETCH_COUNT = 3

In production, experiment with higher prefetch values for higher consumer throughput. As rule of thumb, you can set it to the number of messages the service can process whithin one second, as long as they can fit easily in memory.

__init__(amqp_urls: List[str], event_routing_keys: Sequence[str], command_routing_keys: Sequence[str], logical_service: str, config_routing_keys: Sequence[str] = []) → None

Create a new instance of the consumer class, passing in the AMQP URL used to connect to RabbitMQ.

Parameters:amqp_urls (str) – List of AMQP urls.

The service will try to connect to one of them, in a round-robin fashion.

call_later(delay: float, callback: Callable) → object

Call callback after delay seconds.

Return a handle that can be passed to self.cancel_timer()

handle_command(command: str, payload: Dict[str, Any], conversation_id: str, reply_to: str, correlation_id: str, meta: Dict[str, str]) → None

Handle incoming commands (may be overwriten by subclasses).

The payload is already decoded and is a python data structure compatible with the JSON data model. You should never do any filtering here: use the routing keys intead (see __init__()). Expected errors should be returned with the return_error method.

The default implementation dispatches the messages by calling methods in the form self.on_COMMAND(payload, reply_to, correlation_id) where COMMAND is what is left after stripping the service. prefix from the routing key.

handle_config(configuration: str, payload: Dict[str, Any], meta: Dict[str, str]) → bool

Handle incoming configuration (may be overwritten by subclasses).

The payload is already decoded and is a python data structure compatible with the JSON data model. You should never do any filtering here: use the routing keys intead (see __init__()).

configuration is the routing key.

This callback must return a boolean flag to tell Eventail whether the service configuration is done and that the service can now process business events.

The default implementation dispatches the messages by calling methods in the form self.on_KEY(payload) -> bool where key is the routing key.

handle_event(event: str, payload: Dict[str, Any], conversation_id: str, meta: Dict[str, str]) → None

Handle incoming event (may be overwritten by subclasses).

The payload is already decoded and is a python data structure compatible with the JSON data model. You should never do any filtering here: use the routing keys intead (see __init__()).

The default implementation dispatches the messages by calling methods in the form self.on_KEY(payload) where key is the routing key.

handle_result(key: str, payload: Dict[str, Any], conversation_id: str, status: str, correlation_id: str, meta: Dict[str, str]) → None

Handle incoming result (may be overwritten by subclasses).

The payload is already decoded and is a python data structure compatible with the JSON data model. You should never do any filtering here: use the routing keys intead (see __init__()).

The key is the routing key and status is either “success” or “error”.

The default implementation dispatches the messages by calling methods in the form self.on_KEY(payload, status, correlation_id) where KEY is what is left after stripping the service. prefix from the routing key.

handle_returned_message(key: str, payload: Dict[str, Any], envelope: Dict[str, str])

Invoked when a message is returned (to be implemented by subclasses).

A message maybe returned if: * it was sent with the mandatory flag on True; * and the broker was unable to route it to a queue.

log(criticity: int, short: str, full: str = '', conversation_id: str = '', additional_fields: Dict[KT, VT] = {}) → None

Log to the log bus.

Parameters:
  • criticity: int, in the syslog scale
  • short: str, short description of log
  • full: str, the full message of the log (appears as message in Graylog)
  • additional_fields: Dict, data to be merged into the GELF payload as additional fields
on_ready() → None

Code to execute once the business service is configured and comes online.

(to be implemented by subclasses)

publish_configuration(event: str, message: Dict[str, Any]) → None

Publish a configuration on the bus.

The event is the name of the configuration event, and the message is any data conforming to the JSON model.

publish_event(event: str, message: Dict[str, Any], conversation_id: str, mandatory: bool = False) → None

Publish an event on the bus.

The event is the name of the event, and the message is any data conforming to the JSON model.

If mandatory is True and you have implemented handle_returned_message, then it will be called if your message is unroutable. The default is False because some events maybe unused yet.

return_error(destination: str, message: Dict[str, Any], conversation_id: str, correlation_id: str, mandatory: bool = True) → None

Send a failure result message.

The message is any data conforming to the JSON model. If mandatory is True (default) and you have implemented handle_returned_message, then it will be called if your message is unroutable.

return_success(destination: str, message: Dict[str, Any], conversation_id: str, correlation_id: str, mandatory: bool = True) → None

Send a successful result message.

The message is any data conforming to the JSON model. if mandatory is True (default) and you have implemented handle_returned_message, then it will be called if your message is unroutable.

run() → None

Run the service by connecting to RabbitMQ and then starting the IOLoop to block and allow the SelectConnection to operate.

send_command(command: str, message: Dict[str, Any], conversation_id: str, reply_to: str, correlation_id: str, mandatory: bool = True) → None

Send a command message.

The message is any data conforming to the JSON model. if mandatory is True (default) and you have implemented handle_returned_message, then it will be called if your message is unroutable.

stop(reconnect=False) → None

Cleanly shutdown the connection to RabbitMQ by stopping the consumer with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok will be invoked by pika, which will then closing the channel and connection. The IOLoop is started again if this method is invoked when CTRL-C is pressed raising a KeyboardInterrupt exception. This exception stops the IOLoop which needs to be running for pika to communicate with RabbitMQ. All of the commands issued prior to starting the IOLoop will be buffered but not processed.

This method is automatically triggered if we receive one of these UNIX signals: signal.SIGHUP, signal.SIGTERM, signal.SIGINT.

use_exclusive_queues() → None

Force usage of exclusive queues.

This is useful for debug tools that should not leave a queue behind them (overflow risk) and not interfere between instances.

use_json() → None

Force sending message serialized in plain JSON instead of CBOR.

class eventail.async_service.pika.ReconnectingSupervisor(service_factory: Callable[[...], eventail.async_service.pika.base.Service], *args, **kwargs)

This is an example supervisor that will reconnect if the nested Service indicates that a reconnect is necessary.

__init__(service_factory: Callable[[...], eventail.async_service.pika.base.Service], *args, **kwargs) → None

Supervises a service and manages automatic reconnection.

The *args and **kwargs** are passed unchanged to the service_factory.

run() → None

Run the service until the service chooses to exit without reconnecting.

Asyncio compatible API

Depends on the aiormq library:

pip install .[asyncio]
class eventail.async_service.aio.Service(amqp_urls: List[str], event_routing_keys: Sequence[str], command_routing_keys: Sequence[str], logical_service: str, loop: Optional[asyncio.events.AbstractEventLoop] = None)
HEARTBEAT = 60

Heartbeat interval, must be superior to the expected blocking processing time (in seconds). Beware that the actual delay is negotiated with the broker, and the lower value is taken, so configure Rabbitmq accordingly.

PREFETCH_COUNT = 3

In production, experiment with higher prefetch values for higher consumer throughput

__init__(amqp_urls: List[str], event_routing_keys: Sequence[str], command_routing_keys: Sequence[str], logical_service: str, loop: Optional[asyncio.events.AbstractEventLoop] = None) → None

Create a new instance of the consumer class, passing in the AMQP URL used to connect to RabbitMQ.

Parameters:amqp_url (str) – The AMQP url to connect with
create_task(coro: Coroutine[T_co, T_contra, V_co]) → Union[_asyncio.Task, aiormq.abc.TaskWrapper]

Launch a task.

handle_command(command: str, payload: Dict[str, Any], conversation_id: str, reply_to: str, correlation_id: str, meta: Dict[str, str]) → None

Handle incoming commands (may be overwriten by subclasses).

The payload is already decoded and is a python data structure compatible with the JSON data model. You should never do any filtering here: use the routing keys intead (see __init__()). Expected errors should be returned with the return_error method.

The default implementation dispatches the messages by calling coroutine methods in the form self.on_COMMAND(payload, reply_to, correlation_id) where COMMAND is what is left after stripping the service. prefix from the routing key.

handle_event(event: str, payload: Dict[str, Any], conversation_id: str, meta: Dict[str, str]) → None

Handle incoming event (may be overwritten by subclasses).

The payload is already decoded and is a python data structure compatible with the JSON data model. You should never do any filtering here: use the routing keys intead (see __init__()).

The default implementation dispatches the messages by calling coroutine methods in the form self.on_KEY(payload) where key is the routing key.

handle_result(key: str, payload: Dict[str, Any], conversation_id: str, status: str, correlation_id: str, meta: Dict[str, str]) → None

Handle incoming result (may be overwritten by subclasses).

The payload is already decoded and is a python data structure compatible with the JSON data model. You should never do any filtering here: use the routing keys intead (see __init__()).

The key is the routing key and status is either “success” or “error”.

The default implementation dispatches the messages by calling coroutine methods in the form self.on_KEY(payload, status, correlation_id) where KEY is what is left after stripping the service. prefix from the routing key.

log(criticity: int, short: str, full: str = '', conversation_id: str = '', additional_fields: Dict[KT, VT] = {}) → None

Log to the log bus.

Parameters:
  • criticity: int, in the syslog scale
  • short: str, short description of log
  • full: str, the full message of the log (appears as message in Graylog)
  • additional_fields: Dict, data to be merged into the GELF payload as additional fields
on_ready() → None

Code to execute once the service comes online.

(to be implemented by subclasses)

publish_event(event: str, message: Dict[str, Any], conversation_id: str, mandatory: bool = False) → None

Publish an event on the bus.

The event is the name of the event, and the message is any data conforming to the JSON model.

If mandatory is True and you have implemented handle_returned_message, then it will be called if your message is unroutable. The default is False because some events maybe unused yet.

return_error(destination: str, message: Dict[str, Any], conversation_id: str, correlation_id: str, mandatory: bool = False) → None

Send a failure notification.

The message is any data conforming to the JSON model. if mandatory is True and you have implemented handle_returned_message, then it will be called if your message is unroutable.

return_success(destination: str, message: Dict[str, Any], conversation_id: str, correlation_id: str, mandatory: bool = False) → None

Send a successful result message.

The message is any data conforming to the JSON model. if mandatory is True and you have implemented handle_returned_message, then it will be called if your message is unroutable.

send_command(command: str, message: Dict[str, Any], conversation_id: str, reply_to: str, correlation_id: str, mandatory: bool = False) → None

Send a command message.

The message is any data conforming to the JSON model. if mandatory is True and you have implemented handle_returned_message, then it will be called if your message is unroutable.

stop() → None

Cleanly stop the application.

This method is automatically triggered if we receive one of these UNIX signals: signal.SIGHUP, signal.SIGTERM, signal.SIGINT.

use_exclusive_queues() → None

Force usage of exclusive queues.

This is useful for debug tools that should not leave a queue behind them (overflow ristk) and not interfere between instances.

use_json() → None

Force sending message serialized in plain JSON instead of CBOR.

Synchronous publisher

Depends on the kombu library:

pip install .[synchronous]
class eventail.sync_publisher.Endpoint(amqp_urls: List[str], logical_service: str, connection_max_retries: Optional[int] = None)

A synchronous publishing endpoint for AlloMedia EDA.

__init__(amqp_urls: List[str], logical_service: str, connection_max_retries: Optional[int] = None) → None

Initialize endpoint.

  • amqp_urls is a list of broker urls that will be tried in turn (round robin style) to send messages.
  • logical_service: the logical service this endpoint belongs to.
force_json()

Force serialization of payload into JSON.

log(criticity: int, short: str, full: str = '', conversation_id: str = '', additional_fields: Dict[KT, VT] = {}) → None

Log to the log bus.

Parameters:
  • criticity: int, in the syslog scale
  • short: str, short description of log
  • full: str, the full message of the log (appears as message in Graylog)
  • additional_fields: Dict, data to be merged into the GELF payload as additional fields
publish_configuration(event: str, message: Dict[str, Any], max_retries: Optional[int] = None) → None

Publish a configuration event on the bus.

The event is the name of the event, and the message is any data conforming to the JSON model. max_retries is None by default (retry for ever) but can be an int to specify the number of attempts to send the message before the error is raised. The firt retry is immediate, then the interval increases by one second at each step.

publish_event(event: str, message: Dict[str, Any], conversation_id: str, max_retries: Optional[int] = None) → None

Publish an event on the bus.

The event is the name of the event, and the message is any data conforming to the JSON model. max_retries is None by default (retry for ever) but can be an int to specify the number of attempts to send the message before the error is raised. The firt retry is immediate, then the interval increases by one second at each step.

Shared temporary data store

class eventail.tmp_store.STDataStore(redis_client: redis.client.Redis, namespace: str, ttl: int = 270000)

Temporary tmp_store shared among instances.

To avoid race conditions between workers, all operations are atomic and we provide destructive reads, so that only one worker can take ownership a tmp_stored value.

All stored value have a common, limited, Time To Live to avoid memory leaks.

Current implementation relies on Redis.

is_down()

If STDataStore is down return error message else empty string.

mpop(*keys) → Optional[List[Any]]

Retrieve values for multiple keys and delete them, atomically, if they are all set.

If one of the key is not set, nothing is deleted.

Return None if operation fails or if one of the key does not exist.

mset(data: Dict[str, Any], ttl: Optional[int] = None) → None

Atomically store multiple key: value pairs given in dictionary.

peek(key: str) → Any

Peek at the value associated to key.

The record won’t be erased.

peek_or_create(key: str, factory: Callable[[], Any], max_op_time: int, ttl: Optional[int] = None) → Any

Get value associated with key if it exists, otherwise create it and store it.

The factory is a callable that creates new values on demand. The check and, if needed, the creation of the value are concurrency safe : if the data is missing (or its TTL expired), only one worker will be able to create the value, all others will wait, thanks to a lock.

To avoid dead locks,``max_op_time`` is the maximum time in seconds the lock is hold. The factory must produce a value within that time span.

pop(key: str) → Any

Retrieve the data associated with key and delete the key atomically.

Return None if operation key does not exist.

classmethod sentinel_backend(sentinels: List[str], redis_service_name: str, database: int, password: Optional[str], namespace: str, ttl: int = 270000) → eventail.tmp_store.STDataStore

Use this to connect to Redis Sentinel or Single Redis database.

If more than one server are provided as sentinels, use Sentinel mode, otherwise fallback to single mode.

set(key: str, value: Any, ttl: Optional[int] = None) → None

Store value under key key.

value can be any serializable python data.

classmethod simple_redis_backend(redis_server: Tuple[str, int], database: int, password: Optional[str], namespace: str, ttl: int = 270000) → eventail.tmp_store.STDataStore

Use this to connect to a single Redis database.