Observer DSL

Similar to loading nodes, Arborist has pluggable hooks for sourcing observer data from various sources (databases, LDAP, etc). It only ships with file sources by default. This reference page outlines all possible options for the file backed source.

Overview

Observer definition files are loaded in by the Observer daemon at startup. You can organize them however you please. Files will be discovered irrespective of directory depth. Files must end in an .rb extension to be considered.

An observer is given a human-readable description for use in user interfaces and logging, and optional subscription attributes that indicate what events it is interested in receiving. When a Manager event matches for a given observer, all of its action blocks are executed with the event, depending on the restrictions on each block.

Subscribe

Observers internally use the Event API to register what events they are interested in. Each observer can have zero or more subscription targets. Omitting a subscription declaration is equivalent to subscribing to all node events, attached at the Arborist root node. The subscription can supply the following keywords:

keyword optional description
to no What event to subscribe to. Unknown events are ignored. Here's the list of possible node events.
on yes A node identifier to attach the subscription to. Events flow upwards through the tree, so you can attach to any node to get events for it and its children. Omitting this is equivalent to subscribing to the root node, which always sees all events.
where yes A hash of search criteria to match against when considering the event for action.
exclude yes A hash of search criteria to negatively match against when considering the event for action.

Most node events only are issued once during a transition, but you can use the node.delta event to catch specific node transitions, to craft custom subscription behavior.

Arborist::Observer "Nodes having a problem" do
    subscribe to: 'node.down'
    subscribe to: 'node.warn'

    # ...
end

Arborist::Observer "Relay changes to render service nodes" do
    subscribe to: 'node.update', where: { type: 'service', tag :render }

    # ...
end

# You -could- just subscribe to 'node.up', but this example shows how to
# omit the initial transition from unknown -> up.
#
Arborist::Observer "Recovered nodes" do # {{{
    subscribe to: 'node.delta', where: { delta: { status: [ 'down', 'up' ] } }
    subscribe to: 'node.delta', where: { delta: { status: [ 'warn', 'up' ] } }
    subscribe to: 'node.delta', where: { delta: { status: [ 'acked', 'up' ] } }

    # ...
end

Action

An action is a ruby block, executed when a subscribed event arrives. It is passed the serialized event object that triggered the block, and a hash of events keyed by time. You may have any number of action blocks per observer. An action block can have modifiers, that further control when the block is fired:

argument description
after Only execute the block after this many events are collected. Defaults to 1, meaning it is executed for each event.
during A Schedulability time declaration. The block is only executed if the current time is within this schedule.
within Only execute the block if the event count after fills in this timeframe (expressed in seconds.)
ignore_flapping If the node that generated the event is marked as flapping, the block is not executed.

Additionally, all Observers automatically have access to a client -- you can perform callbacks to the manager to gather more information using this.

Arborist::Observer "Log updates to special nodes" do
    subscribe to: 'node.update', where: { type: 'special' }

    # Do something with each event.
    #
    action do |event|
        # ...
    end

    # Perform an action in batches of 50, but only during working hours
    #
    action( after: 50, during: 'wd {Mon-Fri} hr {9am-4pm}' ) do |last_event, all_events|
        # "all_events" is a hash of the 50 update events, keyed by event time.
    end
end

Summarize

A summarize block is very similar to an action block, but can prioritize on timing intervals instead of event counts. It too has some modifiers to control when the block is fired:

argument description
count Only execute the block after this many events are collected.
every Only execute the block after this much time has passed (expressed in seconds).
during A Schedulability time declaration. The block is only executed if the current time is within this schedule.
using Arborist::TimeRefinements

Arborist::Observer "Examples!" do
    subscribe to: 'node.update', where: { type: 'special' }

    # These two blocks are functionally identical:
    # "Run the block after 50 events"
    #
    action( after: 50 ){|last_event, all_events| # ... }
    summarize( count: 50 ){|last_event, all_events| # ... }

    # "Run this block only after at least 50 events have accumulated, but only check once an hour"
    summarize( count: 50, every: 1.hour ){|last_event, all_events| # ... }

    # "Run this block every 30 seconds regardless of event count".
    # No events is still a no-op.
    summarize( every: 30.seconds ){|last_event, all_events| # ... }
end

Example

Here's a real world example, using an observer to extend Arborist behavior. This observer keeps a history of events, sending them to a PostgreSQL table.

# The table this logs to looks like this:
# 
#                                   Table "arborist.history"
# +--------+--------------------------+------------------------------------------------------+
# | Column |           Type           |                      Modifiers                       |
# +--------+--------------------------+------------------------------------------------------+
# | id     | integer                  | not null default nextval('history_id_seq'::regclass) |
# | time   | timestamp with time zone | not null default now()                               |
# | node   | character varying(100)   | not null                                             |
# | event  | character varying(30)    | not null                                             |
# | data   | jsonb                    | default '{}'::jsonb                                  |
# | type   | character varying(30)    | not null                                             |
# | parent | character varying(100)   | default NULL::character varying                      |
# +--------+--------------------------+------------------------------------------------------+
# Indexes:
#     "history_pkey" PRIMARY KEY, btree (id)
#     "event_idx" btree (event)
#     "time_idx" btree ("time" DESC)
#     "node_idx" btree (node)

require 'pg'
require 'json'

# Password in an external .pgpass file.
db = PG.connect( user: 'arborist', dbname: 'db', host: 'database.example.com' )

# Log all events for posterity.
#
Arborist::Observer "Save history for state transitions" do
    subscribe to: 'node.acked'
    subscribe to: 'node.disabled'
    subscribe to: 'node.down'
    subscribe to: 'node.quieted'
    subscribe to: 'node.unknown'
    subscribe to: 'node.up'
    subscribe to: 'node.warn'

    action do |event|
        begin
            id    = event[ 'identifier' ]
            node  = event[ 'data' ]
            event = event[ 'type' ].sub( 'node.', '' )
            type  = node[ 'type' ]

            parent = case type
                when 'host'
                    nil
                else
                    node[ 'parent' ]
                end

            data = case event
                when 'acked', 'disabled'
                    node[ 'ack' ]
                when 'down'
                    node[ 'errors' ]
                when 'quieted'
                    node[ 'quieted_reasons' ]
                when 'warn'
                    node[ 'warnings' ]
                else
                    {}
                end

            data = JSON.dump( data )
            db.exec_params(
                'insert into history ( node, parent, type, event, data ) values ( $1, $2, $3, $4, $5 )',
                [ id, parent, type, event, data ]
            )

        rescue => err
            db.reset rescue nil
            self.log.warn "Unable to write history: %s" % [ err.message ]
        end
    end
end