# Slixmpp: The Slick XMPP Library
# Copyright (C) 2010 Nathanael C. Fritz
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from slixmpp.stanza.rootstanza import RootStanza
from slixmpp.xmlstream import StanzaBase, ET
ORIGIN_NAME = '{urn:xmpp:sid:0}origin-id'
[docs]class Message(RootStanza):
"""
XMPP's <message> stanzas are a "push" mechanism to send information
to other XMPP entities without requiring a response.
Chat clients will typically use <message> stanzas that have a type
of either "chat" or "groupchat".
When handling a message event, be sure to check if the message is
an error response.
Example <message> stanzas:
.. code-block:: xml
<message to="user1@example.com" from="user2@example.com">
<body>Hi!</body>
</message>
<message type="groupchat" to="room@conference.example.com">
<body>Hi everyone!</body>
</message>
Stanza Interface:
- **body**: The main contents of the message.
- **subject**: An optional description of the message's contents.
- **mucroom**: (Read-only) The name of the MUC room that sent the message.
- **mucnick**: (Read-only) The MUC nickname of message's sender.
Attributes:
- **types**: May be one of: normal, chat, headline, groupchat, or error.
"""
name = 'message'
namespace = 'jabber:client'
plugin_attrib = name
interfaces = {'type', 'to', 'from', 'id', 'body', 'subject', 'thread',
'parent_thread', 'mucroom', 'mucnick'}
sub_interfaces = {'body', 'subject', 'thread'}
lang_interfaces = sub_interfaces
types = {'normal', 'chat', 'headline', 'error', 'groupchat'}
def __init__(self, *args, recv=False, **kwargs):
"""
Initialize a new <message /> stanza with an optional 'id' value.
Overrides StanzaBase.__init__.
"""
StanzaBase.__init__(self, *args, **kwargs)
if not recv and self['id'] == '':
if self.stream:
use_ids = getattr(self.stream, 'use_message_ids', None)
if use_ids:
self.set_id(self.stream.new_id())
else:
self.del_origin_id()
[docs] def get_type(self):
"""
Return the message type.
Overrides default stanza interface behavior.
Returns 'normal' if no type attribute is present.
:rtype: str
"""
return self._get_attr('type', 'normal')
def get_id(self):
return self._get_attr('id') or ''
def get_origin_id(self):
sub = self.xml.find(ORIGIN_NAME)
if sub is not None:
return sub.attrib.get('id') or ''
return ''
def _set_ids(self, value) -> None:
if value is None or value == '':
return None
self.xml.attrib['id'] = value
if self.stream:
if not getattr(self.stream, 'use_origin_id', False):
self.del_origin_id()
return None
sub = self.xml.find(ORIGIN_NAME)
if sub is not None:
sub.attrib['id'] = value
else:
sub = ET.Element(ORIGIN_NAME)
sub.attrib['id'] = value
self.xml.append(sub)
def set_id(self, value):
return self._set_ids(value)
def set_origin_id(self, value: str):
return self._set_ids(value)
def del_origin_id(self):
sub = self.xml.find(ORIGIN_NAME)
if sub is not None:
self.xml.remove(sub)
[docs] def get_parent_thread(self):
"""Return the message thread's parent thread.
:rtype: str
"""
thread = self.xml.find('{%s}thread' % self.namespace)
if thread is not None:
return thread.attrib.get('parent', '')
return ''
[docs] def set_parent_thread(self, value):
"""Add or change the message thread's parent thread.
:param str value: identifier of the thread"""
thread = self.xml.find('{%s}thread' % self.namespace)
if value:
if thread is None:
thread = ET.Element('{%s}thread' % self.namespace)
self.xml.append(thread)
thread.attrib['parent'] = value
else:
if thread is not None and 'parent' in thread.attrib:
del thread.attrib['parent']
[docs] def del_parent_thread(self):
"""Delete the message thread's parent reference."""
thread = self.xml.find('{%s}thread' % self.namespace)
if thread is not None and 'parent' in thread.attrib:
del thread.attrib['parent']
[docs] def chat(self):
"""Set the message type to 'chat'."""
self['type'] = 'chat'
return self
[docs] def normal(self):
"""Set the message type to 'normal'."""
self['type'] = 'normal'
return self
[docs] def reply(self, body=None, clear=True):
"""
Create a message reply.
Overrides StanzaBase.reply.
Sets proper 'to' attribute if the message is from a MUC, and
adds a message body if one is given.
:param str body: Optional text content for the message.
:param bool clear: Indicates if existing content should be removed
before replying. Defaults to True.
:rtype: :class:`~.Message`
"""
new_message = StanzaBase.reply(self, clear)
if not getattr(self.stream, "is_component", False) and self['type'] == 'groupchat':
new_message['to'] = new_message['to'].bare
new_message['thread'] = self['thread']
new_message['parent_thread'] = self['parent_thread']
del new_message['id']
if self.stream is not None and self.stream.use_message_ids:
new_message['id'] = self.stream.new_id()
if body is not None:
new_message['body'] = body
return new_message
[docs] def get_mucroom(self):
"""
Return the name of the MUC room where the message originated.
Read-only stanza interface.
:rtype: str
"""
if self['type'] == 'groupchat':
return self['from'].bare
else:
return ''
[docs] def get_mucnick(self):
"""
Return the nickname of the MUC user that sent the message.
Read-only stanza interface.
:rtype: str
"""
if self['type'] == 'groupchat':
return self['from'].resource
else:
return ''
[docs] def set_mucroom(self, value):
"""Dummy method to prevent modification."""
pass
[docs] def del_mucroom(self):
"""Dummy method to prevent deletion."""
pass
[docs] def set_mucnick(self, value):
"""Dummy method to prevent modification."""
pass
[docs] def del_mucnick(self):
"""Dummy method to prevent deletion."""
pass