Source code for slixmpp.stanza.message


# Slixmpp: The Slick XMPP Library
# Copyright (C) 2010  Nathanael C. Fritz
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from slixmpp.stanza.rootstanza import RootStanza
from slixmpp.types import MessageTypes
from slixmpp.xmlstream import StanzaBase, ET


[docs] class Message(RootStanza): """ XMPP's <message> stanzas are a "push" mechanism to send information to other XMPP entities without requiring a response. Chat clients will typically use <message> stanzas that have a type of either "chat" or "groupchat". When handling a message event, be sure to check if the message is an error response. Example <message> stanzas: .. code-block:: xml <message to="user1@example.com" from="user2@example.com"> <body>Hi!</body> </message> <message type="groupchat" to="room@conference.example.com"> <body>Hi everyone!</body> </message> Stanza Interface: - **body**: The main contents of the message. - **subject**: An optional description of the message's contents. - **mucroom**: (Read-only) The name of the MUC room that sent the message. - **mucnick**: (Read-only) The MUC nickname of message's sender. Attributes: - **types**: May be one of: normal, chat, headline, groupchat, or error. """ name = 'message' namespace = 'jabber:client' plugin_attrib = name interfaces = {'type', 'to', 'from', 'id', 'body', 'subject', 'thread', 'parent_thread', 'mucroom', 'mucnick'} sub_interfaces = {'body', 'thread'} lang_interfaces = sub_interfaces types = {'normal', 'chat', 'headline', 'error', 'groupchat'} def __init__(self, *args, recv=False, **kwargs): """ Initialize a new <message /> stanza with an optional 'id' value. Overrides StanzaBase.__init__. """ StanzaBase.__init__(self, *args, **kwargs) if not recv and self['id'] == '': if self.stream and getattr(self.stream, 'use_message_ids', False): self.set_id(self.stream.new_id())
[docs] def get_type(self) -> MessageTypes: """ Return the message type. Overrides default stanza interface behavior. Returns 'normal' if no type attribute is present. :rtype: str """ return self._get_attr('type', 'normal') # type:ignore[return-value]
def get_id(self) -> str: return self._get_attr('id') or '' def set_id(self, value: str | None) -> None: if value is None or value == '': return None self.xml.attrib['id'] = value # Compatibility fallbacks from when we supported XEP-0359 origin-id. get_origin_id = get_id set_origin_id = set_id def del_origin_id(self): sub = self.xml.find('{urn:xmpp:sid:0}origin-id') if sub is not None: self.xml.remove(sub)
[docs] def get_parent_thread(self): """Return the message thread's parent thread. :rtype: str """ thread = self.xml.find('{%s}thread' % self.namespace) if thread is not None: return thread.attrib.get('parent', '') return ''
[docs] def set_parent_thread(self, value): """Add or change the message thread's parent thread. :param str value: identifier of the thread""" thread = self.xml.find('{%s}thread' % self.namespace) if value: if thread is None: thread = ET.Element('{%s}thread' % self.namespace) self.xml.append(thread) thread.attrib['parent'] = value else: if thread is not None and 'parent' in thread.attrib: del thread.attrib['parent']
[docs] def del_parent_thread(self): """Delete the message thread's parent reference.""" thread = self.xml.find('{%s}thread' % self.namespace) if thread is not None and 'parent' in thread.attrib: del thread.attrib['parent']
[docs] def chat(self): """Set the message type to 'chat'.""" self['type'] = 'chat' return self
[docs] def normal(self): """Set the message type to 'normal'.""" self['type'] = 'normal' return self
[docs] def reply(self, body=None, clear=True): """ Create a message reply. Overrides StanzaBase.reply. Sets proper 'to' attribute if the message is from a MUC, and adds a message body if one is given. :param str body: Optional text content for the message. :param bool clear: Indicates if existing content should be removed before replying. Defaults to True. :rtype: :class:`~.Message` """ new_message = StanzaBase.reply(self, clear) if not getattr(self.stream, "is_component", False) and self['type'] == 'groupchat': new_message['to'] = new_message['to'].bare new_message['thread'] = self['thread'] new_message['parent_thread'] = self['parent_thread'] del new_message['id'] if self.stream is not None and self.stream.use_message_ids: new_message['id'] = self.stream.new_id() if body is not None: new_message['body'] = body return new_message
[docs] def get_mucroom(self): """ Return the name of the MUC room where the message originated. Read-only stanza interface. :rtype: str """ if self['type'] == 'groupchat': return self['from'].bare else: return ''
[docs] def get_mucnick(self) -> str: """ Return the nickname of the MUC user that sent the message. Read-only stanza interface. :rtype: str """ if self['type'] == 'groupchat': return self['from'].resource else: return ''
[docs] def set_mucroom(self, value): """Dummy method to prevent modification.""" pass
[docs] def del_mucroom(self): """Dummy method to prevent deletion.""" pass
[docs] def set_mucnick(self, value): """Dummy method to prevent modification.""" pass
[docs] def del_mucnick(self): """Dummy method to prevent deletion.""" pass
def __get_subject_el_str(self) -> str: return f"{{{self.namespace}}}subject" def __get_subject_element(self) -> ET.Element | None: return self.xml.find(self.__get_subject_el_str()) def set_subject(self, subject: str, **_kwargs) -> None: el = self.__get_subject_element() if el is None: el = ET.Element(self.__get_subject_el_str()) self.xml.append(el) el.text = subject self.values["subject"] = subject def get_subject(self) -> str | None: el = self.__get_subject_element() if el is None: return None return el.text def del_subject(self) -> None: el = self.__get_subject_element() if el is None: return self.xml.remove(el) del self.values["subject"]