import collections
import logging
import re
import time
import irc.client
from . import message
__all__ = ['Event3', 'ServerConnection3']
log = logging.getLogger(__name__)
[docs]class Event3(irc.client.Event):
"""An IRC event with tags
See `tag specification <http://ircv3.net/specs/core/message-tags-3.2.html>`_.
"""
[docs] def __init__(self, type, source, target, arguments=None, tags=None):
"""Initialize a new event
:param type: a string describing the event
:type type: :class:`str`
:param source: The originator of the event. NickMask or server
:type source: :class:`irc.client.NickMask` | :class:`str`
:param target: The target of the event
:type target: :class:`str`
:param arguments: Any specific event arguments
:type arguments: :class:`list` | None
:raises: None
"""
super(Event3, self).__init__(type, source, target, arguments)
self.tags = tags
def __repr__(self, ): # pragma: no cover
"""Return a canonical representation of the object
:rtype: :class:`str`
:raises: None
"""
args = (self.__class__.__name__, self.type, self.source, self.target, self.arguments, self.tags)
return '<%s %s, %s to %s, %s, tags: %s>' % args
def __eq__(self, other):
"""Return True, if the events share equal attributes
:param other: the other event to compare
:type other: :class:`Event3`
:returns: True, if equal
:rtype: :class:`bool`
:raises: None
"""
return self.type == other.type and\
self.source == other.source and\
self.target == other.target and\
self.arguments == other.arguments and\
self.tags == other.tags
[docs]class ServerConnection3(irc.client.ServerConnection):
"""ServerConncetion that can handle irc v3 tags
Tags are only handled for privmsg, pubmsg, notice events.
All other events might be handled the old way.
"""
_cmd_pat = "^(@(?P<tags>[^ ]+) +)?(:(?P<prefix>[^ ]+) +)?(?P<command>[^ ]+)( *(?P<argument> .+))?"
_rfc_1459_command_regexp = re.compile(_cmd_pat)
[docs] def __init__(self, reactor, msglimit=20, limitinterval=30):
"""Initialize a connection that has a limit to sending messages
:param reactor: the reactor of the connection
:type reactor: :class:`irc.client.Reactor`
:param msglimit: the maximum number of messages to send in limitinterval
:type msglimit: :class:`int`
:param limitinterval: the timeframe in seconds in which you can only send
as many messages as in msglimit
:type limitinterval: :class:`int`
:raises: None
"""
super(ServerConnection3, self).__init__(reactor)
self.sentmessages = collections.deque(maxlen=msglimit + 1)
"""A queue with timestamps form the last sent messages.
So we can track if we send to many messages."""
self.limitinterval = limitinterval
"""the timeframe in seconds in which you can only send
as many messages as in :data:`ServerConncetion3msglimit`"""
[docs] def get_waittime(self):
"""Return the appropriate time to wait, if we sent too many messages
:returns: the time to wait in seconds
:rtype: :class:`float`
:raises: None
"""
now = time.time()
self.sentmessages.appendleft(now)
if len(self.sentmessages) == self.sentmessages.maxlen:
# check if the oldes message is older than
# limited by self.limitinterval
oldest = self.sentmessages[-1]
waittime = self.limitinterval - (now - oldest)
if waittime > 0:
return waittime + 1 # add a little buffer
return 0
[docs] def send_raw(self, string):
"""Send raw string to the server.
The string will be padded with appropriate CR LF.
If too many messages are sent, this will call
:func:`time.sleep` until it is allowed to send messages again.
:param string: the raw string to send
:type string: :class:`str`
:returns: None
:raises: :class:`irc.client.InvalidCharacters`,
:class:`irc.client.MessageTooLong`,
:class:`irc.client.ServerNotConnectedError`
"""
waittime = self.get_waittime()
if waittime:
log.debug('Sent too many messages. Waiting %s seconds',
waittime)
time.sleep(waittime)
return super(ServerConnection3, self).send_raw(string)
def _process_line(self, line):
"""Process the given line and handle the events
:param line: the raw message
:type line: :class:`str`
:returns: None
:rtype: None
:raises: None
"""
m = self._rfc_1459_command_regexp.match(line)
prefix = m.group('prefix')
tags = self._process_tags(m.group('tags'))
source = self._process_prefix(prefix)
command = self._process_command(m.group('command'))
arguments = self._process_arguments(m.group('argument'))
if not self.real_server_name:
self.real_server_name = prefix
# Translate numerics into more readable strings.
command = irc.events.numeric.get(command, command)
if command not in ["privmsg", "notice"]:
return super(ServerConnection3, self)._process_line(line)
event = Event3("all_raw_messages", self.get_server_name(),
None, [line], tags=tags)
self._handle_event(event)
target, msg = arguments[0], arguments[1]
messages = irc.ctcp.dequote(msg)
command = self._resolve_command(command, target)
for m in messages:
self._handle_message(tags, source, command, target, m)
def _resolve_command(self, command, target):
"""Get the correct event for the command
Only for 'privmsg' and 'notice' commands.
:param command: The command string
:type command: :class:`str`
:param target: either a user or a channel
:type target: :class:`str`
:returns: the correct event type
:rtype: :class:`str`
:raises: None
"""
if command == "privmsg":
if irc.client.is_channel(target):
command = "pubmsg"
else:
if irc.client.is_channel(target):
command = "pubnotice"
else:
command = "privnotice"
return command
def _handle_message(self, tags, source, command, target, msg):
"""Construct the correct events and handle them
:param tags: the tags of the message
:type tags: :class:`list` of :class:`message.Tag`
:param source: the sender of the message
:type source: :class:`str`
:param command: the event type
:type command: :class:`str`
:param target: the target of the message
:type target: :class:`str`
:param msg: the content
:type msg: :class:`str`
:returns: None
:rtype: None
:raises: None
"""
if isinstance(msg, tuple):
if command in ["privmsg", "pubmsg"]:
command = "ctcp"
else:
command = "ctcpreply"
msg = list(msg)
log.debug("tags: %s, command: %s, source: %s, target: %s, "
"arguments: %s", tags, command, source, target, msg)
event = Event3(command, source, target, msg, tags=tags)
self._handle_event(event)
if command == "ctcp" and msg[0] == "ACTION":
event = Event3("action", source, target, msg[1:], tags=tags)
self._handle_event(event)
else:
log.debug("tags: %s, command: %s, source: %s, target: %s, "
"arguments: %s", tags, command, source, target, [msg])
event = Event3(command, source, target, [msg], tags=tags)
self._handle_event(event)
def _process_tags(self, tags):
"""Process the tags of the message
:param tags: the tags string of a message
:type tags: :class:`str` | None
:returns: list of tags
:rtype: :class:`list` of :class:`message.Tag`
:raises: None
"""
if not tags:
return []
return [message.Tag.from_str(x) for x in tags.split(';')]
def _process_prefix(self, prefix):
"""Process the prefix of the message and return the source
:param prefix: The prefix string of a message
:type prefix: :class:`str` | None
:returns: The prefix wrapped in :class:`irc.client.NickMask`
:rtype: :class:`irc.client.NickMask` | None
:raises: None
"""
if not prefix:
return None
return irc.client.NickMask(prefix)
def _process_command(self, command):
"""Return a lower string version of the command
:param command: the command of the message
:type command: :class:`str` | None
:returns: The lower case version
:rtype: :class:`str` | None
:raises: None
"""
if not command:
return None
return command.lower()
def _process_arguments(self, arguments):
"""Process the arguments
:param arguments: arguments string of a message
:type arguments: :class:`str` | None
:returns: A list of arguments
:rtype: :class:`list` of :class:`str` | None
:raises: None
"""
if not arguments:
return None
a = arguments.split(" :", 1)
arglist = a[0].split()
if len(a) == 2:
arglist.append(a[1])
return arglist