import asyncio
import logging
from typing import Any, Dict, List

import aiohttp

from .. import gitter
from ..gitter import AioGitterAPI
from .commands import command_routes

logger = logging.getLogger(__name__)  # pylint: disable=invalid-name


{ message: 'message', level='error|normal' }

[docs]class GitterListener: """Listens to messages in a Gitter chat room Args: app: Web Server Application api: Gitter API object rooms: Map containing rooms and their respective github user/repo """ def __init__(self, app: aiohttp.web.Application, token: str, rooms: Dict[str, str], session: aiohttp.ClientSession, ghappapi) -> None: self.rooms = rooms self._ghappapi = ghappapi self._api = AioGitterAPI(app['client_session'], token) self._user: gitter.User = None self._tasks: List[Any] = [] self._session = session app.on_startup.append(self.start) app.on_shutdown.append(self.shutdown) def __str__(self) -> str: return f"{self.__class__.__name__}"
[docs] async def start(self, app: aiohttp.web.Application) -> None: """Start listeners""" self._user = await self._api.get_user() logger.debug("%s: User Info: %s", self, self._user) for room in await self._api.list_rooms(): logger.debug("%s: Room Info: %s", self, room) logger.debug("%s: Groups Info: %s", self, await self._api.list_groups()) self._tasks = [app.loop.create_task(self.listen(room)) for room in self.rooms]
[docs] async def shutdown(self, _app: aiohttp.web.Application) -> None: """Send cancel signal to listener""""%s: Shutting down listeners", self) for task in self._tasks: task.cancel() for task in self._tasks: await task"%s: Shut down all listeners", self)
[docs] async def listen(self, room_name: str) -> None: """Main run loop""" try: user, repo = self.rooms[room_name].split('/') logger.error("Listening in %s for repo %s/%s", room_name, user, repo) message = None while True: try: room = await self._api.get_room(room_name)"%s: joining %s", self, room_name) await self._api.join_room(self._user, room)"%s: listening in %s", self, room_name) async for message in self._api.iter_chat(room): # getting a new ghapi object for every message because our # creds time out. Ideally, the api class would take care of that. ghapi = await self._ghappapi.get_github_api(False, user, repo) await self.handle_msg(room, message, ghapi) # on timeouts, we just run log into the room again except (aiohttp.ClientConnectionError, asyncio.TimeoutError): pass # http errors just get logged except aiohttp.ClientResponseError as exc: logger.exception("HTTP Error Code %s while listening to room %s", exc.code, room_name) # asyncio cancellation needs to be passed up except asyncio.CancelledError: # pylint: disable=try-except-raise raise # the rest, we just log so that we remain online after an error except Exception: # pylint: disable=broad-except logger.exception("Unexpected exception caught. Last message: '%s'", message) await asyncio.sleep(1) except asyncio.CancelledError: logger.error("%s: stopped listening in %s", self, room_name) # we need a new session here as the one we got passed might have been # closed already when we get cancelled async with aiohttp.ClientSession() as session: self._api._session = session await self._api.leave_room(self._user, room) logger.error("%s: left room %s", self, room_name)
[docs] async def handle_msg(self, room: gitter.Room, message: gitter.Message, ghapi) -> None: """Parse Gitter message and dispatch via command_routes""" await self._api.mark_as_read(self._user, room, []) if not in (m.userId for m in message.mentions): if self._user.username.lower() in (m.screenName.lower() for m in message.mentions): await self._api.send_message(room, "@%s - are you talking to me?", message.fromUser.username) return command = message.text.strip().lstrip('@'+self._user.username).strip() if command == message.text.strip(): await self._api.send_message(room, "Hmm? Someone talking about me?", message.fromUser.username) return cmd, *args = command.split() issue_number = None try: if args[-1][0] == '#': issue_number = int(args[-1][1:]) args.pop() except (ValueError, IndexError): pass response = await command_routes.dispatch(cmd.lower(), ghapi, issue_number, message.fromUser.username, *args) if response: await self._api.send_message(room, "@%s: %s", message.fromUser.username, response) else: await self._api.send_message(room, "@%s: command failed", message.fromUser.username)