| #!/usr/bin/env python | 
 | # Copyright 2017 The Chromium Authors. All rights reserved. | 
 | # Use of this source code is governed by a BSD-style license that can be | 
 | # found in the LICENSE file. | 
 | """Custom swarming triggering script. | 
 |  | 
 | This script does custom swarming triggering logic, to enable device affinity | 
 | for our bots, while lumping all trigger calls under one logical step. | 
 |  | 
 | For the perf use case of device affinity, this script now enables soft device | 
 | affinity.  This means that it tries to smartly allocate jobs to bots based | 
 | on what is currently alive and what bot the task was last triggered on, | 
 | preferring that last triggered bot if available.  If the | 
 | --multiple-trigger-configs flag is specified than this script overrides | 
 | the soft device affinity functionality in favor of the provided ids. | 
 |  | 
 | The algorithm is roughly the following: | 
 |  | 
 | Find eligible bots, healthy or not. | 
 |   * Query swarming for eligible bots based on the dimensions passed in | 
 |     on the swarming call.  Determine their health status based on | 
 |     is not quarantied and is not is_dead | 
 |  | 
 | Of the eligible bots determine what bot id to run the shard on. | 
 | (Implementation in _select_config_indices_with_soft_affinity) | 
 |   * First query swarming for the last task that ran that shard with | 
 |     given dimensions.  Assuming they are returned with most recent first. | 
 |   * Check if the bot id that ran that task is alive, if so trigger | 
 |     on that bot again. | 
 |   * If that bot isn't alive, allocate to another alive bot or if no | 
 |     other alive bots exist, trigger on the same dead one. | 
 |  | 
 | Scripts inheriting must have roughly the same command line interface as | 
 | swarming.py trigger. It modifies it in the following ways: | 
 |  | 
 |  * Intercepts the dump-json argument, and creates its own by combining the | 
 |    results from each trigger call. | 
 |  * Intercepts the dimensions from the swarming call and determines what bots | 
 |    are healthy based on the above device affinity algorithm, and triggers | 
 |  * Adds a tag to the swarming trigger job with the shard so we know the last | 
 |    bot that ran this shard. | 
 |  | 
 | This script is normally called from the swarming recipe module in tools/build. | 
 |  | 
 | """ | 
 |  | 
 | import argparse | 
 | import json | 
 | import os | 
 | import subprocess | 
 | import sys | 
 | import tempfile | 
 | import urllib | 
 |  | 
 | import base_test_triggerer | 
 |  | 
 | class Bot(object): | 
 |   """ Eligible bots to run the task""" | 
 |   def __init__(self, bot_id, is_alive): | 
 |     self._bot_id = bot_id | 
 |     self._is_alive = is_alive | 
 |  | 
 |   def id(self): | 
 |     return self._bot_id | 
 |  | 
 |   def is_alive(self): | 
 |     return self._is_alive | 
 |  | 
 |   def as_json_config(self): | 
 |     return {'id': self._bot_id} | 
 |  | 
 | class PerfDeviceTriggerer(base_test_triggerer.BaseTestTriggerer): | 
 |   def __init__(self, args, swarming_args): | 
 |     super(PerfDeviceTriggerer, self).__init__() | 
 |     if not args.multiple_trigger_configs: | 
 |       # Represents the list of current dimensions requested | 
 |       # by the parent swarming job. | 
 |       self._dimensions = self._get_swarming_dimensions(swarming_args) | 
 |  | 
 |       # Store what swarming server we need and whether or not we need | 
 |       # to send down authentication with it | 
 |       self._swarming_server = self._get_swarming_server(swarming_args) | 
 |       self._service_account = self._get_service_account(swarming_args) | 
 |  | 
 |       # Map of all existing bots in swarming that satisfy the current | 
 |       # set of dimensions indexed by bot id. | 
 |       # Note: this assumes perf bot dimensions are unique between | 
 |       # configurations. | 
 |       self._eligible_bots_by_ids = ( | 
 |           self._query_swarming_for_eligible_bot_configs(self._dimensions)) | 
 |  | 
 |   def append_additional_args(self, args, shard_index): | 
 |     # Append a tag to the swarming task with the shard number | 
 |     # so we can query for the last bot that ran a specific shard. | 
 |     tag = 'shard:%d' % shard_index | 
 |     shard_tag = ['--tag', tag] | 
 |     # Need to append this before the dash if present so it gets fed to | 
 |     # the swarming task itself. | 
 |     if '--' in args: | 
 |       dash_ind = args.index('--') | 
 |       return args[:dash_ind] + shard_tag + args[dash_ind:] | 
 |     else: | 
 |       return args + shard_tag | 
 |  | 
 |   def parse_bot_configs(self, args): | 
 |     if args.multiple_trigger_configs: | 
 |       super(PerfDeviceTriggerer, self).parse_bot_configs(args) | 
 |     else: | 
 |       self._bot_configs = [] | 
 |       # For each eligible bot, append the dimension | 
 |       # to the eligible bot_configs | 
 |       for  _, bot in self._eligible_bots_by_ids.iteritems(): | 
 |         self._bot_configs.append(bot.as_json_config()) | 
 |  | 
 |   def select_config_indices(self, args, verbose): | 
 |     if args.multiple_trigger_configs: | 
 |       # If specific bot ids were passed in, we want to trigger a job for | 
 |       # every valid config regardless of health status since | 
 |       # each config represents exactly one bot in the perf swarming pool. | 
 |       return range(args.shards) | 
 |     return self._select_config_indices_with_soft_affinity(args, verbose) | 
 |  | 
 |   def _select_config_indices_with_soft_affinity(self, args, verbose): | 
 |     # First make sure the number of shards doesn't exceed the | 
 |     # number of eligible bots.  This means there is a config error somewhere. | 
 |     if args.shards > len(self._eligible_bots_by_ids): | 
 |       raise ValueError('Not enough available machines exist in in swarming' | 
 |                        'pool. Contact labs to rack in more hardware') | 
 |  | 
 |     shard_to_bot_assignment_map = {} | 
 |     unallocated_bots = set(self._eligible_bots_by_ids.values()) | 
 |     for shard_index in xrange(args.shards): | 
 |       bot_id = self._query_swarming_for_last_shard_id(shard_index) | 
 |       if bot_id: | 
 |         bot = self._eligible_bots_by_ids[bot_id] | 
 |         shard_to_bot_assignment_map[shard_index] = bot | 
 |         unallocated_bots.discard(bot) | 
 |       else: | 
 |         shard_to_bot_assignment_map[shard_index] = None | 
 |  | 
 |     # Now create sets of remaining healthy and bad bots | 
 |     unallocated_healthy_bots = {b for b in unallocated_bots if b.is_alive()} | 
 |     unallocated_bad_bots = {b for b in unallocated_bots if not b.is_alive()} | 
 |  | 
 |     # Try assigning healthy bots for new shards first. | 
 |     for shard_index, bot in sorted(shard_to_bot_assignment_map.iteritems()): | 
 |       if not bot and unallocated_healthy_bots: | 
 |         shard_to_bot_assignment_map[shard_index] = \ | 
 |             unallocated_healthy_bots.pop() | 
 |         if verbose: | 
 |           print 'First time shard %d has been triggered' % shard_index | 
 |       elif not bot: | 
 |         shard_to_bot_assignment_map[shard_index] = unallocated_bad_bots.pop() | 
 |  | 
 |     # Handle the rest of shards that were assigned dead bots: | 
 |     for shard_index, bot in sorted(shard_to_bot_assignment_map.iteritems()): | 
 |       if not bot.is_alive() and unallocated_healthy_bots: | 
 |         dead_bot = bot | 
 |         healthy_bot = unallocated_healthy_bots.pop() | 
 |         shard_to_bot_assignment_map[shard_index] = healthy_bot | 
 |         if verbose: | 
 |           print ('Device affinity broken for bot %s, new ' | 
 |                  'mapping to bot %s' % (dead_bot.id(), healthy_bot.id())) | 
 |  | 
 |     # Now populate the indices into the bot_configs array | 
 |     selected_configs = [] | 
 |     for shard_index in xrange(args.shards): | 
 |       selected_configs.append(self._find_bot_config_index( | 
 |           shard_to_bot_assignment_map[shard_index].id())) | 
 |     return selected_configs | 
 |  | 
 |  | 
 |   def _query_swarming_for_eligible_bot_configs(self, dimensions): | 
 |     """ Query Swarming to figure out which bots are available. | 
 |  | 
 |       Returns: a dictionary in which the keys are the bot id and | 
 |       the values are Bot object that indicate the health status | 
 |       of the bots. | 
 |     """ | 
 |     values = [] | 
 |     for key, value in sorted(dimensions.iteritems()): | 
 |       values.append(('dimensions', '%s:%s' % (key, value))) | 
 |  | 
 |     query_result = self.query_swarming( | 
 |         'bots/list', values, True, server=self._swarming_server, | 
 |         service_account=self._service_account) | 
 |     perf_bots = {} | 
 |     for bot in query_result['items']: | 
 |       alive = (not bot['is_dead'] and not bot['quarantined']) | 
 |       perf_bots[bot['bot_id']] = Bot(bot['bot_id'], alive) | 
 |     return perf_bots | 
 |  | 
 |   def _find_bot_config_index(self, bot_id): | 
 |     # Find the index into the bot_config map that | 
 |     # maps to the bot id in question | 
 |     for i, dimensions in enumerate(self._bot_configs): | 
 |       if dimensions['id'] == bot_id: | 
 |         return i | 
 |     return None | 
 |  | 
 |   def _query_swarming_for_last_shard_id(self, shard_index): | 
 |     # Per shard, query swarming for the last bot that ran the task | 
 |     # Example: swarming.py query -S server-url.com --limit 1 \\ | 
 |     #  'tasks/list?tags=os:Windows&tags=pool:chrome.tests.perf&tags=shard:12' | 
 |     values = [ | 
 |       ('tags', '%s:%s' % (k, v)) for k, v in self._dimensions.iteritems() | 
 |     ] | 
 |     # Append the shard as a tag | 
 |     values.append(('tags', '%s:%s' % ('shard', str(shard_index)))) | 
 |     values.sort() | 
 |     # TODO(eyaich): For now we are ignoring the state of the returned | 
 |     # task (ie completed, timed_out, bot_died, etc) as we are just | 
 |     # answering the question "What bot did we last trigger this shard on?" | 
 |     # Evaluate if this is the right decision going forward. | 
 |  | 
 |     # Query for the last task that ran with these dimensions and this shard | 
 |     query_result = self.query_swarming( | 
 |           'tasks/list', values, True, limit='1', server=self._swarming_server, | 
 |          service_account=self._service_account) | 
 |     tasks = query_result.get('items') | 
 |     if tasks: | 
 |       # We queried with a limit of 1 so we could only get back | 
 |       # the most recent which is what we care about. | 
 |       task = tasks[0] | 
 |       if 'bot_id' in task: | 
 |         return task['bot_id'] | 
 |       for tag in task['tags']: | 
 |         if tag.startswith('id:'): | 
 |           return tag[len('id:'):] | 
 |     # No eligible shard for this bot | 
 |     return None | 
 |  | 
 |   def _get_swarming_dimensions(self, args): | 
 |     dimensions = {} | 
 |     for i in xrange(len(args) - 2): | 
 |       if args[i] == '--dimension': | 
 |         dimensions[args[i+1]] = args[i+2] | 
 |     return dimensions | 
 |  | 
 |   def _get_swarming_server(self, args): | 
 |     for i in xrange(len(args)): | 
 |       if '--swarming' in args[i]: | 
 |         server = args[i+1] | 
 |         slashes_index = server.index('//') + 2 | 
 |         # Strip out the protocol | 
 |         return server[slashes_index:] | 
 |  | 
 |   def _get_service_account(self, args): | 
 |     for i in xrange(len(args) - 1): | 
 |       if '--auth-service-account-json' in args[i]: | 
 |         return args[i+1] | 
 |  | 
 | def main(): | 
 |   # Setup args for common contract of base class | 
 |   parser = base_test_triggerer.BaseTestTriggerer.setup_parser_contract( | 
 |       argparse.ArgumentParser(description=__doc__)) | 
 |   args, remaining = parser.parse_known_args() | 
 |  | 
 |   triggerer = PerfDeviceTriggerer(args, remaining) | 
 |   return triggerer.trigger_tasks(args, remaining) | 
 |  | 
 | if __name__ == '__main__': | 
 |   sys.exit(main()) | 
 |  |