Ben Judson • Feb 26th, 2018
During a recent overhaul of Lingo’s private API service, which is built in Python’s Flask framework, we started exploring a “Ports and Adapters” style architecture. There are several components to this architecture, but the pattern I’m going to share here is what we call the ActionRunner, which is a variation on the Command Bus pattern.
The goal is to provide a common gateway for any operation that could change the state of the app (ie write to the database). Part of the inspiration for this came from the adoption of Redux in our web client, which made debugging complex state changes dramatically easier. Although the architecture is quite different, Redux also dictates that every change of state go through a single gateway.
Each unit of business logic (aka use case) is an action, and every time we run an action, it is executed by the action runner. For us, the primary requirements for this gateway are:
We define all of the use cases of the app as instances of an ActionHandlerclass; each action lives in its own module and has a required execute()method which contains business logic, and an optional validate() method for complex request validation. Each action has attributes for the arguments it accepts and the dependencies it requires.
The ActionRunner is responsible for finding and instantiating action handlers with the correct dependencies, passing in the arguments, validating, and executing. When an action succeeds (or fails) it logs the event.
Let’s start with a concrete example of an ActionHandler. The handler declares what it needs:
The argument values all inherit from a simple namedtuple called Argwhich allows defining some metadata about each argument (type, required, default, options, validator function, etc). By the time the execute() method runs, all arguments can be assumed to be valid, with defaults provided.
The method set_args() on the base ActionHandler class loops over the items in the arguments dict to validate the data provided by the client.
Most validation can be handled in set_args(). But for more complex or customized validation, the action handler can also provide a validate()method, which is run after set_args() and before execute().
class CreateDraft(ActionHandler):
    requires = ['repo', 'checkpoint']
    arguments = {
        'space_id': Arg(int, required=True),
        'name': Arg(str, required=True, validator=Arg.validate.trim),
        'notes': Arg(str, validator=Arg.validate.trim),
        'status': Arg(str, default='active', options=['active', 'pending', 'deleted'])
    }
    def validate(self):
        self.checkpoint.has_permission_for_space(self._args.space_id, 'create_draft')
    def execute(self):
        args = self._args
        draft = Draft(
            space_id=args.space_id,
            name=args.name,
            notes=args.notes,
            status=args.status
        )
        self.repo.save(draft)
        return draftJust so we can see how dead simple the views are with this approach, the view that runs the action looks like this (pretty much every action-based view looks the same):
@app.route('/draft', methods=['POST'])
@provide_action_runner
def _create_draft():
    try:
        draft = request.action_runner.execute('create_draft', request.json)
        request.repo.commit()
        return response.success({
            'draft': request.serializer.encode(draft, 'draft')
        })
    except LingoError as err:
        return response.error(err)Our provide_action_runner decorator instantiates the ActionRunner with any dependencies that an action may require, and attaches it to Flask’s global request object. At this writing these dependencies include:
Though most serializing and logging happens outside the action handlers, a small number of handlers have need for these objects internally.
This is the class that handles orchestrating the execution of the action handlers. Apart from initialization, the ActionRunner only has one public method, execute(). It’s not very complex, but there are a few things to note:
class ActionRunner():
    """A kind of Command Bus for Lingo actions.
    The execute() method instantiates the action class, sets arguments, runs validation, then
    executes the actual action. Afterward, it logs the action.
    Instantiate with whatever dependencies actions need, and then call execute() with action type
    and action args.
    """
    def __init__(self, **kwargs):
        self._populate(kwargs)
        self.action_runner = self
    def _populate(self, attributes):
        for key, value in attributes.items():
            setattr(self, key, value)
    def execute(self, action_type, action, **kwargs):
        """Execute an action.
        Args:
            action_type (str): The name of the action (used to import action handler)
            action (dict): The arguments that will be passed to the action itself
        """
        user_id = getattr(self, 'user_id', 0)
        action_name, action_handler = self._get_action_handler(action_type)
        self._write_preflight_log(action_type, action, action_handler, user_id)
        action_handler = self._prepare_action_handler(action_handler, action)
        try:
            result = action_handler.execute()
        except LingoError as e:
            self._write_error_log(action_name, action_handler, user_id, e)
            raise e
        else:
            self._write_success_log(action_name, action_handler, user_id)
        return result
    def _get_action_handler(self, action_type):
        """Resolve action module by name and import handler."""
        if type(action_type) is str:
            comps = action_type.split('.')
            name = comps.pop()
            module = 'actions'
            if len(comps) > 0:
                module = 'actions.{}'.format('.'.join(comps))
            handler = import_module(
                '.%s' % name, mod).handler
            return action_type, handler
        raise TypeError('Action type should be string, not {}'
                        .format(type(action_type).__name__))
    def _prepare_action_handler(self, action_handler, action):
        """Prepare action handler for execution.
        
        After this method runs, all dependency injection and validation will be complete.
        """
        action_handler = action_handler()
        self._inject_dependencies(action_handler)
        args = {} if action is None else action
        action_handler.set_args(**args)
        action_handler.validate()
        return action_handler
    def _inject_dependencies(self, action_handler):
        try:
            action_handler(**{dep: getattr(self, dep) for dep in action_handler.requires})
        except AttributeError:
            raise ValueError('Cannot provide requirements for action: {}'
                              .format(type(action_handler).__name__))
    def _write_preflight_log(self, action_name, action, action_handler, user_id):
        """Logs action name along with raw data submitted by user."""
        self.logger.info('Action submitted: [{}] [{}] [{}]'.format(
                         user_id, action_name, json.dumps(action)))
    def _write_success_log(self, action_name, action_handler, user_id):
        """Logs action name and arguments after they have gone through validation."""
        self.logger.info('Action succeeded: [{}] [{}] [{}]'.format(
                         user_id, action_name, str(action_handler)))
    def _write_error_log(self, action_name, action_handler, user_id, e):
        """Logs action name, post-validation arguments, and information about error raised."""
        self.logger.info('Action raised error: [{}] [{}] [{}] [{}] [{}]'.format(
            user_id, action_name, e.message, e.code, str(action_handler)
        ))The class that all action handlers inherit from is relatively simple, mostly handling validation of arguments in the set_args() method. We described this above, but here is an example with some basic validation: making sure the client provides required values, providing defaults, and checking the provided value against an options list. In the full version of this class, we also do some extra type conversion, for instance building a datetime object from a user-provided string, or casting strings to integers.
class ActionHandler:
    """Base class for action handlers, which contain the logic for all the core use cases.
    Each action handler should be defined in its own file, exposed as `handler`, and executed
    by the ActionRunner.
    Attributes:
    - requires: list of objects that should be injected by ActionRunner
    - arguments: arguments that should be set before execute() is called
    """
    __metaclass__ = abc.ABCMeta
    requires = []
    arguments = {}
    def set_args(self, **kwargs):
        """Validate and populate action arguments.
        Each action instance defines the arguments it accepts, including the name, value type,
        default value, etc (see Arg for all options). This method loops through provided arguments
        and validates against the definition of the Arg. If any args don't validate, an error is
        raised here. Otherwise, they are set on the action instance _args attribute as a namedtuple.
        """
        self.original_arguments = kwargs
        Args = namedtuple('Args', [k for k, v in self.arguments.items()])
        Args.__new__.__defaults__ = (None,) * len(self.arguments.items())
        valid = {}
        for k, arg in self.arguments.items():
            val = kwargs.get(k, None)
            if val is None and arg.default:
                val = arg.default() if callable(arg.default) else arg.default
            if val is None and arg.required:
                raise LingoError(LingoError.Code.invalid_params, '{} is required'.format(k))
            if arg.options and val not in arg.options:
                raise LingoError(LingoError.Code.invalid_params,
                    'Invalid value ({}) provided for {}. Expected {}'.format(val, k, arg.options))
            if callable(arg.validator):
                val = arg.validator(val, k)
            valid[k] = val
        self._args = Args(**valid)
    def validate(self):
        """Any special validation apart from arg types can be placed here."""
        return
    @abc.abstractmethod
    def execute(self):
        """The actual business logic of the action goes here."""
        passAlthough we didn’t show it above, our ActionRunner also allows adding actions to a Celery queue with an alternative execute_async() method. Without going into a lot of detail about this, the fact that the action is just a name string and a dictionary of primitive arguments means that it’s very easy to send an action over the wire to a background queue.
We generally run validation before it is queued, so that validation errors can be sent back to the client, and then pass along the ID of the user that invoked the action so that permissions can be checked when the action is finally run in Celery. So far, this system has worked very well, and turns every use case into a potential background task almost for free.
Since PHP’s Laravel framework (which is pretty big on Domain Driven Design concepts) incorporated a Command Bus into core, there is a fair amount of discussion of the pattern and its uses in the PHP community.
I haven’t seen much discussion of this pattern in Python circles (which is one of the big reasons I wanted to write this post). However, I was able to find a couple of gists showing bare-bones implementations of the pattern in Python:
If you know of other examples, please post them in the comments!