Source code for

from os.path import join

[docs]class AllowAnyAuthorization(object):
[docs] def authorize_setup(self): pass
[docs] def authorize_tool_file(self, name, contents): pass
[docs] def authorize_execution(self, job_directory, command_line): pass
[docs] def authorize_config_file(self, job_directory, name, path): pass
[docs]class AllowAnyAuthorizer(object): """ Allow any, by default LWR is assumed to be secured using a firewall or private_token. """ ALLOW_ANY_AUTHORIZATION = AllowAnyAuthorization()
[docs] def get_authorization(self, tool_id): return self.ALLOW_ANY_AUTHORIZATION
[docs]class ToolBasedAuthorization(AllowAnyAuthorization): def __init__(self, tool): self.tool = tool def __unauthorized(self, msg): raise Exception("Unauthorized action attempted: %s" % msg)
[docs] def authorize_setup(self): if self.tool is None: self.__unauthorized("Attempt to setup a tool with id not registered with LWR toolbox.")
[docs] def authorize_tool_file(self, name, contents): tool = self.tool tool_dir = tool.get_tool_dir() tool_dir_file = join(tool_dir, name) allowed_contents = open(tool_dir_file).read() if contents != allowed_contents: self.__unauthorized("Attempt to write tool file with contents differing from LWR copy of tool file.")
[docs] def authorize_config_file(self, job_directory, name, path): if not self.__inputs_validator.validate_config(job_directory, name, path): self.__unauthorized("Attempt to utilize unauthorized configfile.")
[docs] def authorize_execution(self, job_directory, command_line): if not self.__inputs_validator.validate_command(job_directory, command_line): self.__unauthorized("Attempt to execute unauthorized command.")
@property def __inputs_validator(self): return self.tool.inputs_validator
[docs]class ToolBasedAuthorizer(object): """ Work In Progress: Implement tool based white-listing of what jobs can run and what those jobs can do. """ def __init__(self, toolbox): self.toolbox = toolbox
[docs] def get_authorization(self, tool_id): tool = None try: tool = self.toolbox.get_tool(tool_id) except: pass return ToolBasedAuthorization(tool)
[docs]def get_authorizer(toolbox): if toolbox: # Use toolbox as a white list. authorizer = ToolBasedAuthorizer(toolbox) else: # No toolbox specified, allow any tools to run. authorizer = AllowAnyAuthorizer() return authorizer
__all__ = [get_authorizer]