API Documentation¶
Continuation based API¶
This module is for callbacks style of asynchronous application development.
We depend on the pika library for lower level communication and event loop.
- class eventail.async_service.pika.Service(amqp_urls: List[str], event_routing_keys: Sequence[str], command_routing_keys: Sequence[str], logical_service: str, config_routing_keys: Sequence[str] = [])¶
This is an example service that will handle unexpected interactions with RabbitMQ such as channel and connection closures.
If RabbitMQ closes the connection, this class will stop and indicate that reconnection is necessary. You should look at the output, as there are limited reasons why the connection may be closed, which usually are tied to permission related issues or socket timeouts.
If the channel is closed, it will indicate a problem with one of the commands that were issued and that should surface in the output as well.
To leverage the binary nature of AMQP messages, we use CBOR instead of JSON as data serialization (transparent). Moreover, CBOR is much faster and much more compact than JSON.
- BLOCKED_TIMEOUT = 3600¶
When rabbitmq is low on resources, it may temporarily block the connection. We can specify a timeout if it is not acceptable to the service (in seconds)
- HEARTBEAT = 60¶
Heartbeat interval, must be superior to the expected blocking processing time (in seconds). Beware that the actual delay is negotiated with the broker, and the lower value is taken, so configure Rabbitmq accordingly.
- PREFETCH_COUNT = 3¶
In production, experiment with higher prefetch values for higher consumer throughput. As rule of thumb, you can set it to the number of messages the service can process whithin one second, as long as they can fit easily in memory.
- __init__(amqp_urls: List[str], event_routing_keys: Sequence[str], command_routing_keys: Sequence[str], logical_service: str, config_routing_keys: Sequence[str] = []) None¶
Create a new instance of the consumer class, passing in the AMQP URL used to connect to RabbitMQ.
- Parameters:
amqp_urls (str) – List of AMQP urls.
The service will try to connect to one of them, in a round-robin fashion.
- call_later(delay: float, callback: Callable) object¶
Call callback after delay seconds.
Return a handle that can be passed to self.cancel_timer()
- handle_command(command: str, payload: Dict[str, Any], conversation_id: str, reply_to: str, correlation_id: str, meta: Dict[str, str]) Any¶
Handle incoming commands (may be overwriten by subclasses).
The payload is already decoded and is a python data structure compatible with the JSON data model. You should never do any filtering here: use the routing keys intead (see
__init__()). Expected errors should be returned with thereturn_errormethod.The default implementation dispatches the messages by calling methods in the form
self.on_COMMAND(payload, reply_to, correlation_id)where COMMAND is what is left after stripping theservice.prefix from the routing key, and returning their return value.Return a falsy value (default is None) for automatic acknowledgement of processed message or a truthy one for manual acknowdegment later.
Never mix manual acknowledgment with automatic end-of-handler acknowledgement.
- handle_config(configuration: str, payload: Dict[str, Any], meta: Dict[str, str]) bool¶
Handle incoming configuration (may be overwritten by subclasses).
The payload is already decoded and is a python data structure compatible with the JSON data model. You should never do any filtering here: use the routing keys intead (see
__init__()).configurationis the routing key.This callback must return a boolean flag to tell Eventail whether the service configuration is done and that the service can now process business events.
The default implementation dispatches the messages by calling methods in the form
self.on_KEY(payload) -> boolwhere key is the routing key.
- handle_event(event: str, payload: Dict[str, Any], conversation_id: str, meta: Dict[str, str]) Any¶
Handle incoming event (may be overwritten by subclasses).
The payload is already decoded and is a python data structure compatible with the JSON data model. You should never do any filtering here: use the routing keys intead (see
__init__()).The default implementation dispatches the messages by calling methods in the form
self.on_KEY(payload)where key is the routing key and returning their return value.Return a falsy value (default is None) for automatic acknowledgement of processed message or a truthy one for manual acknowdegment later.
Never mix manual acknowledgment with automatic end-of-handler acknowledgement.
- handle_result(key: str, payload: Dict[str, Any], conversation_id: str, status: str, correlation_id: str, meta: Dict[str, str]) Any¶
Handle incoming result (may be overwritten by subclasses).
The payload is already decoded and is a python data structure compatible with the JSON data model. You should never do any filtering here: use the routing keys intead (see
__init__()).The
keyis the routing key andstatusis either “success” or “error”.The default implementation dispatches the messages by calling methods in the form
self.on_KEY(payload, status, correlation_id)where KEY is what is left after stripping theservice.prefix from the routing key, and returning their return value.Return a falsy value (default is None) for automatic acknowledgement of processed message or a truthy one for manual acknowdegment later.
Never mix manual acknowledgment with automatic end-of-handler acknowledgement.
- handle_returned_message(key: str, payload: Dict[str, Any], envelope: Dict[str, str])¶
Invoked when a message is returned (to be implemented by subclasses).
A message maybe returned if: * it was sent with the mandatory flag on True; * and the broker was unable to route it to a queue.
- log(criticity: int, short: str, full: str = '', conversation_id: str = '', additional_fields: Dict = {}) None¶
Log to the log bus.
- Parameters:
criticity: int, in the syslog scale
short: str, short description of log
full: str, the full message of the log (appears as message in Graylog)
additional_fields: Dict, data to be merged into the GELF payload as additional fields
- on_ready() None¶
Code to execute once the business service is configured and comes online.
(to be implemented by subclasses)
- publish_configuration(event: str, message: Dict[str, Any]) None¶
Publish a configuration on the bus.
The
eventis the name of the configuration event, and the message is any data conforming to the JSON model.
- publish_event(event: str, message: Dict[str, Any], conversation_id: str, mandatory: bool = False) None¶
Publish an event on the bus.
The
eventis the name of the event, and the message is any data conforming to the JSON model.If mandatory is True and you have implemented handle_returned_message, then it will be called if your message is unroutable. The default is False because some events maybe unused yet.
- return_error(destination: str, message: Dict[str, Any], conversation_id: str, correlation_id: str, mandatory: bool = True) None¶
Send a failure result message.
The message is any data conforming to the JSON model. If mandatory is True (default) and you have implemented handle_returned_message, then it will be called if your message is unroutable.
- return_success(destination: str, message: Dict[str, Any], conversation_id: str, correlation_id: str, mandatory: bool = True) None¶
Send a successful result message.
The message is any data conforming to the JSON model. if mandatory is True (default) and you have implemented handle_returned_message, then it will be called if your message is unroutable.
- run() None¶
Run the service by connecting to RabbitMQ and then starting the IOLoop to block and allow the SelectConnection to operate.
- send_command(command: str, message: Dict[str, Any], conversation_id: str, reply_to: str, correlation_id: str, mandatory: bool = True) None¶
Send a command message.
The message is any data conforming to the JSON model. if mandatory is True (default) and you have implemented handle_returned_message, then it will be called if your message is unroutable.
- stop(reconnect=False) None¶
Cleanly shutdown the connection to RabbitMQ by stopping the consumer with RabbitMQ. When RabbitMQ confirms the cancellation, on_cancelok will be invoked by pika, which will then closing the channel and connection. The IOLoop is started again if this method is invoked when CTRL-C is pressed raising a KeyboardInterrupt exception. This exception stops the IOLoop which needs to be running for pika to communicate with RabbitMQ. All of the commands issued prior to starting the IOLoop will be buffered but not processed.
This method is automatically triggered if we receive one of these UNIX signals: signal.SIGHUP, signal.SIGTERM, signal.SIGINT.
- use_exclusive_queues() None¶
Force usage of exclusive queues.
This is useful for debug tools that should not leave a queue behind them (overflow risk) and not interfere between instances.
- use_json() None¶
Force sending message serialized in plain JSON instead of CBOR.
- class eventail.async_service.pika.ReconnectingSupervisor(service_factory: Callable[[...], Service], *args, **kwargs)¶
This is an example supervisor that will reconnect if the nested Service indicates that a reconnect is necessary.
- __init__(service_factory: Callable[[...], Service], *args, **kwargs) None¶
Supervises a service and manages automatic reconnection.
The
*argsand**kwargs**are passed unchanged to theservice_factory.
- run() None¶
Run the service until the service chooses to exit without reconnecting.
Synchronous publisher¶
Depends on the kombu library:
pip install .[synchronous]
- class eventail.sync_publisher.Endpoint(amqp_urls: List[str], logical_service: str, connection_max_retries: int | None = None)¶
A synchronous publishing endpoint for AlloMedia EDA.
- __init__(amqp_urls: List[str], logical_service: str, connection_max_retries: int | None = None) None¶
Initialize endpoint.
amqp_urlsis a list of broker urls that will be tried in turn (round robin style) to send messages.logical_service: the logical service this endpoint belongs to.
- force_json()¶
Force serialization of payload into JSON.
- log(criticity: int, short: str, full: str = '', conversation_id: str = '', additional_fields: Dict = {}) None¶
Log to the log bus.
- Parameters:
criticity: int, in the syslog scale
short: str, short description of log
full: str, the full message of the log (appears as message in Graylog)
additional_fields: Dict, data to be merged into the GELF payload as additional fields
- publish_configuration(event: str, message: Dict[str, Any], max_retries: int | None = None) None¶
Publish a configuration event on the bus.
The
eventis the name of the event, and the message is any data conforming to the JSON model. max_retries is None by default (retry for ever) but can be an int to specify the number of attempts to send the message before the error is raised. The firt retry is immediate, then the interval increases by one second at each step.
- publish_event(event: str, message: Dict[str, Any], conversation_id: str, max_retries: int | None = None) None¶
Publish an event on the bus.
The
eventis the name of the event, and the message is any data conforming to the JSON model. max_retries is None by default (retry for ever) but can be an int to specify the number of attempts to send the message before the error is raised. The firt retry is immediate, then the interval increases by one second at each step.