Source code for bastio.ssh.client

# Copyright 2013 Databracket LLC
# See LICENSE file for details.

"""
:module: bastio.ssh.client
:synopsis: A module for SSH client implementations.
:author: Amr Ali <amr@databracket.com>

.. autoclass:: BackendConnector
    :members:
"""

__author__ = "Amr Ali"
__copyright__ = "Copyright 2013 Databracket LLC"
__license__ = "GPLv3+"

import time
import socket
import paramiko
import collections
import Queue as queue

from bastio import __version__
from bastio.log import Logger
from bastio.mixin import KindSingletonMeta, public
from bastio.configs import GlobalConfigStore
from bastio.concurrency import GlobalThreadPool, Task
from bastio.ssh.protocol import Netstring, MessageParser, ProtocolMessage
from bastio.excepts import (BastioBackendError, BastioEOFError,
        BastioNetstringError, BastioMessageError, reraise)

# Set paramiko client ID
paramiko.Transport._CLIENT_ID = "bastio-{}".format(__version__)

@public
[docs]class BackendConnector(object): """A singleton to establish and maintain a secure connection with the backend over a specific subsystem channel. This connector supports registering of endpoints where processors can register their endpoint to communicate with the backend. It is guaranteed that the messages will be delivered ASAP but the actual ETA is chaotic. """ __metaclass__ = KindSingletonMeta EndPoint = collections.namedtuple("EndPoint", "ingress egress") Subsystem = 'bastio-agent' def __init__(self): cfg = GlobalConfigStore() self._tp = GlobalThreadPool() self._username = cfg.agent_username self._agent_key = cfg.agentkey self._backend_addr = (cfg.host, cfg.port) self._backend_hostkey = cfg.backend_hostkey self._logger = Logger() self._endpoints = [] self._tx = queue.Queue() self._conn_handler_task = None self._connected = False self._running = False self._client = None self._chan = None
[docs] def start(self): """Start the connection handler thread.""" if not self._running: self._running = True t = Task(target=self.__conn_handler, infinite=True) t.failure = self._catch_fail self._conn_handler_task = self._tp.run(t)
[docs] def stop(self): """Stop the connection handler thread.""" if self._running: self._running = False self.close() self._conn_handler_task.stop()
[docs] def register(self, endpoint): """Register an endpoint to this connector to so that it can communicate with the backend. The endpoint is a tuple of one ingress queue as first argument and egress as the second argument. :param endpoint: A tuple of two queues; ingress and egress. :type endpoint: :class:`BackendConnector.EndPoint` """ self._endpoints.append(endpoint)
[docs] def is_active(self): """Check whether the transport is still active.""" if self._client: t = self._client.get_transport() if t: return t.is_active() return False
[docs] def close(self): """Close open channels and transport.""" if self._chan: self._chan.close() if self._client: self._client.close() self._connected = False self._logger.critical("connection lost with the backend")
def __conn_handler(self, kill_ev): self._logger.warning("backend connection handler started") while not kill_ev.is_set(): # Try to connect to the backend try: self._connect() except BastioBackendError as ex: self.close() self._logger.critical(ex.message) # TODO: Implement a more decent reconnection strategy time.sleep(5) # Sleep 5 seconds before retrial continue # Read a message from the wire, parse it, and push it to ingress queue(s) try: json_string = self._read_message() message = MessageParser.parse(json_string) self._put_ingress(message) except socket.timeout: pass # No messages are ready to be read except BastioNetstringError as ex: self._logger.critical( "error parsing a Netstring message: {}".format(ex.message)) self.close() continue except BastioMessageError as ex: self._logger.critical( "error parsing a protocol message: {}".format(ex.message)) self.close() continue except BastioEOFError: self._logger.critical("received EOF on channel") self.close() continue # Get an item from the egress queue(s) and send it to the backend try: message = self._get_egress(timeout=0.01) # 10ms if message == None: # No message is available to send continue self._write_message(message.to_json()) except socket.timeout: # Too many un-ACK'd packets? Sliding window shut on our fingers? # We don't really know what happened, lets reschedule the last # message for retransmission anyway self._push_queue(self._tx, message) except BastioEOFError: # Message was not sent because channel was closed # re-push the message to the TX queue again and retry connection self._push_queue(self._tx, message) self.close() continue def _connect(self): """An idempotent method to connect to the backend.""" try: if self._connected: return # Prepare host keys self._client = paramiko.SSHClient() hostkeys = self._client.get_host_keys() hostkey_server_name = self._make_hostkey_entry_name(self._backend_addr) hostkeys.add(hostkey_server_name, self._backend_hostkey.get_name(), self._backend_hostkey) # Try to connect self._client.connect(hostname=self._backend_addr[0], port=self._backend_addr[1], username=self._username, pkey=self._agent_key, allow_agent=False, look_for_keys=False) self._connected = True # Open session and establish the subsystem self._chan = self._invoke_bastio() self._logger.critical("connection established with the backend") except BastioBackendError: raise except paramiko.AuthenticationException: reraise(BastioBackendError, "authentication with backend failed") except paramiko.BadHostKeyException: reraise(BastioBackendError, "backend host key does not match") except socket.error as ex: reraise(BastioBackendError, ex.strerror.lower()) except Exception: reraise(BastioBackendError) def _invoke_bastio(self): """Start a bastio subsystem on an already authenticated transport. :returns: A channel connected to the subsystem or None. """ if not self.is_active(): raise BastioBackendError("client is not connected") t = self._client.get_transport() chan = t.open_session() if not chan: raise BastioBackendError("opening a session with the backend failed") chan.settimeout(0.01) # 10ms chan.invoke_subsystem(self.Subsystem) return chan def _read_message(self): nets = Netstring(self._chan) return nets.recv() def _write_message(self, data): nets = Netstring.compose(data) remaining = len(nets) while remaining > 0: n = self._chan.send(nets) if n <= 0: raise BastioEOFError("channel closed") remaining -= n def _put_ingress(self, item): for endpoint in self._endpoints: endpoint.ingress.put(item) def _get_egress(self, timeout): for endpoint in self._endpoints: try: item = endpoint.egress.get_nowait() self._tx.put(item) except queue.Empty: pass try: return self._tx.get(timeout=timeout) except queue.Empty: return None @staticmethod def _catch_fail(failure): log = Logger() try: raise failure.exception, failure.message, failure.traceback except: msg = 'unexpected error occurred: {}'.format(failure.message) log.critical(msg, exc_info=True) @staticmethod def _make_hostkey_entry_name(addr): """We do the following to work around a paramiko inconsistency.""" if addr[1] == paramiko.config.SSH_PORT: return addr[0] return '[{}]:{}'.format(*addr)

This Page