====== Matrix Bot AI - Полная документация ======
===== Описание ======
Matrix Bot AI - это интеллектуальный чат-бот для платформы Matrix, работающий на n8n. Бот запоминает историю разговоров в PostgreSQL и использует OpenAI GPT-4o-mini для генерации ответов с учетом контекста предыдущих сообщений.
**Основные возможности:**
* Запоминание истории разговоров в разных комнатах
* Использование контекста при ответах
* Сохранение ID и имен собеседников
* Автоматическая сортировка истории от новых к старым
===== Архитектура ======
==== Компоненты ====
**1. Matrix сервер** - отправляет вебхук при новом сообщении
**2. n8n Workflow** - обрабатывает сообщение и генерирует ответ
**3. PostgreSQL** - хранит историю разговоров
**4. OpenAI API** - генерирует ответы через GPT-4o-mini
**5. Matrix Bot сервис** - отправляет сообщения обратно в чат
===== Логика работы ======
==== Поток данных ====
1. **Webhook приходит** → Формирует данные сообщения (текст, room_id, sender, sender_name)
2. **Query Memory** → Получает последние 10 сообщений из истории этой комнаты
3. **Build Context** → Собирает контекст разговора для AI с сохранением порядка (новые сообщения в конце)
4. **AI Agent** → OpenAI генерирует ответ на основе контекста
5. **Save and Send** → Сохраняет результат в БД и отправляет сообщение в чат
6. **Response** → Возвращает успешный ответ на вебхук
==== Структура данных ====
**Таблица conversation_history:**
* **room_id** - VARCHAR - ID комнаты Matrix
* **sender** - VARCHAR - Matrix ID отправителя (@admin:domain)
* **sender_name** - VARCHAR - Отображаемое имя в чате (Николай)
* **message** - TEXT - Текст сообщения
* **bot_response** - TEXT - Ответ бота
* **created_at** - TIMESTAMP - Время создания записи
===== Подготовка базы данных ======
Выполните эти SQL команды в PgAdmin:
-- Создание таблицы conversation_history
CREATE TABLE conversation_history (
id SERIAL PRIMARY KEY,
room_id VARCHAR(255) NOT NULL,
sender VARCHAR(255) NOT NULL,
sender_name VARCHAR(255),
message TEXT,
bot_response TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- Создание индекса для быстрого поиска по room_id
CREATE INDEX idx_room_id ON conversation_history(room_id);
CREATE INDEX idx_created_at ON conversation_history(created_at DESC);
-- Добавление колонки sender_name если таблица уже существует
ALTER TABLE conversation_history
ADD COLUMN sender_name VARCHAR(255);
===== Nodes в Workflow ======
==== 1. Webhook ====
**Тип:** n8n-nodes-base.webhook
**Назначение:** Получает HTTP POST запрос от Matrix сервера
**Параметры:**
* Path: matrix-bot-ai
* HTTP Method: POST
* Response Mode: responseNode
**Входящие данные:**
{
"body": {
"text": "сообщение",
"room_id": "!abc123:domain",
"sender": "@user:domain",
"sender_name": "User Name",
"is_dm": false
}
}
==== 2. Format Input ====
**Тип:** n8n-nodes-base.code (JavaScript)
**Назначение:** Форматирует входящие данные в стандартный вид
**Код:**
const body = $json.body;
return {
text: body.message || body.text,
room_id: body.room_id,
sender: body.sender || '',
sender_name: body.sender_name || 'User',
originalMessage: body.message || body.text,
is_group: !body.is_dm
};
**Выходные данные:**
{
"text": "сообщение",
"room_id": "!abc123:domain",
"sender": "@user:domain",
"sender_name": "User Name",
"originalMessage": "сообщение",
"is_group": false
}
==== 3. Query Memory ====
**Тип:** n8n-nodes-base.postgres
**Назначение:** Получает историю из PostgreSQL
**Операция:** executeQuery
**SQL Query:**
SELECT sender, sender_name, message, bot_response, created_at
FROM conversation_history
WHERE room_id = '{{room_id}}'
ORDER BY created_at DESC
LIMIT 10
**Параметр Query (в n8n):**
SELECT sender, sender_name, message, bot_response, created_at FROM conversation_history WHERE room_id = '{{ "'" + $json.room_id.replace(/'/g, "''") + "'" }}' ORDER BY created_at DESC LIMIT 10
**Выходные данные:** Массив последних 10 сообщений (или пусто если их нет)
==== 4. Build Context Code ====
**Тип:** n8n-nodes-base.code (JavaScript)
**Назначение:** Собирает контекст разговора для AI из памяти
**Код:**
// Get Format Input from $node
const formatInput = $node['Format Input'].json;
// Get all items from Query Memory
const allItems = $input.all();
const memoryItems = allItems.filter(item => item.json && item.json.message && item.json.sender) || [];
let contextText = '';
if (memoryItems.length > 0) {
contextText = 'Recent conversation history:\n\n';
// NEWEST FIRST - iterate from 0 to end (last item is most recent)
for (let i = 0; i < memoryItems.length; i++) {
const record = memoryItems[i].json || {};
if (record.message && record.sender && record.bot_response && record.created_at) {
const ts = new Date(record.created_at).toLocaleString('ru-RU');
const senderName = record.sender_name || record.sender;
contextText += `[${ts}] ${senderName}: ${record.message}\n`;
contextText += ` → Jarvice: ${record.bot_response}\n\n`;
}
}
}
// IMPORTANT: Return ALL fields from Format Input
return {
context: contextText,
text: formatInput.text,
room_id: formatInput.room_id,
sender: formatInput.sender,
sender_name: formatInput.sender_name,
originalMessage: formatInput.originalMessage,
is_group: formatInput.is_group
};
**Выходные данные:**
{
"context": "Recent conversation history:\n\n[время] Николай: 3+2\n → Jarvice: 5\n\n",
"text": "сообщение",
"room_id": "!abc123:domain",
"sender": "@user:domain",
"sender_name": "User Name",
"originalMessage": "сообщение",
"is_group": false
}
==== 5. AI Agent ====
**Тип:** @n8n/n8n-nodes-langchain.agent
**Назначение:** LangChain Agent использующий OpenAI модель
**Параметры:**
* Prompt Type: define
* Text: ''{{ $json.context + 'User (' + $json.sender_name + '): ' + $json.text }}''
* System Message: (см. ниже - НЕ МЕНЯТЬ!)
**System Message:**
Ты полезный AI ассистент для Matrix чата по имени Jarvice🤖.
Ты помощник Администратора (Николай).
Отвечай кратко, используй emoji.
Обязательно используй контекст предыдущих сообщений для лучшего ответа.
ПРАВИЛА ЧАТА (сообщай их ТОЛЬКО если тебя о них спросили есть они или нет или просят сказать какие правила в этом чате, перечисляй только эти правила и не добавляй другие от себя).
1. Сообщения в чате сохраняются одну неделю.
2. Все сообщения старше недели удаляются.
3. В чате есть AI ассистент - может помочь с вопросами.
4. Соблюдай законодательство РФ.
==== 6. OpenAI Model ====
**Тип:** @n8n/n8n-nodes-langchain.lmChatOpenAi
**Назначение:** Подключение к OpenAI API
**Параметры:**
* Model: gpt-4o-mini
* Credentials: aitunnel (ваш OpenAI API key)
==== 7. Save and Send ====
**Тип:** n8n-nodes-base.code (JavaScript)
**Назначение:** Подготавливает данные для сохранения и отправки
**Код:**
const ai = $json.output || $node['AI Agent'].json.output || '';
const buildContext = $node['Build Context Code'].json;
const roomId = buildContext.room_id || '';
const isGroup = buildContext.is_group || false;
const sender = buildContext.sender || '';
const senderName = buildContext.sender_name || '';
const originalMessage = buildContext.originalMessage || '';
const esc = (s) => {
if (!s) return 'NULL';
return "'" + String(s).replace(/'/g, "''") + "'";
};
const query = "INSERT INTO conversation_history (room_id, sender, sender_name, message, bot_response) VALUES (" +
esc(roomId) + ", " +
esc(sender) + ", " +
esc(senderName) + ", " +
esc(originalMessage) + ", " +
esc(ai) + ")";
return {
message: ai,
room_id: roomId,
sender_id: isGroup ? sender : null,
sender_name: isGroup ? senderName : null,
query: query
};
**Выходные данные:**
{
"message": "Ответ бота",
"room_id": "!abc123:domain",
"sender_id": "@user:domain",
"sender_name": "User Name",
"query": "INSERT INTO conversation_history..."
}
==== 8. Send to Matrix ====
**Тип:** n8n-nodes-base.httpRequest
**Назначение:** Отправляет ответ в Matrix через Matrix Bot сервис
**Параметры:**
* URL: http://matrix_bot:5000/send_message
* Method: POST
* Send Body: true
* Specify Body: json
**JSON Body:**
{{ JSON.stringify({room_id: $json.room_id, message: $json.message, sender_id: $json.sender_id, sender_name: $json.sender_name}) }}
==== 9. Save to Database ====
**Тип:** n8n-nodes-base.postgres
**Назначение:** Сохраняет разговор в PostgreSQL
**Параметры:**
* Operation: executeQuery
* Query: ''{{ $json.query }}''
==== 10. Respond to Webhook ====
**Тип:** n8n-nodes-base.respondToWebhook
**Назначение:** Отправляет успешный ответ на вебхук
**Параметры:**
* Respond With: json
* Response Body: ''{{ JSON.stringify({ success: true }) }}''
===== Connections (Связи между nodes) ======
**Webhook** → Format Input
**Format Input** → Query Memory (index 0)
**Format Input** → Build Context Code (index 1)
**Query Memory** → Build Context Code (index 0)
**Build Context Code** → AI Agent
**OpenAI Model** → AI Agent (ai_languageModel)
**AI Agent** → Save and Send
**Save and Send** → Send to Matrix
**Save and Send** → Save to Database
**Send to Matrix** → Respond to Webhook
===== JSON для импорта в n8n ======
Используйте этот JSON для создания workflow в n8n:
{
"name": "Matrix Bot AI",
"nodes": [
{
"parameters": {
"httpMethod": "POST",
"path": "matrix-bot-ai",
"responseMode": "responseNode",
"options": {}
},
"id": "webhook-1",
"name": "Webhook",
"type": "n8n-nodes-base.webhook",
"position": [-400, 304],
"webhookId": "2dad965f-08e2-412f-960a-b16c91dea686",
"typeVersion": 2
},
{
"parameters": {
"language": "javascript",
"jsCode": "const body = $json.body;\nreturn {\n text: body.message || body.text,\n room_id: body.room_id,\n sender: body.sender || '',\n sender_name: body.sender_name || 'User',\n originalMessage: body.message || body.text,\n is_group: !body.is_dm\n};"
},
"id": "format-input",
"name": "Format Input",
"type": "n8n-nodes-base.code",
"position": [-144, 304],
"typeVersion": 1
},
{
"parameters": {
"operation": "executeQuery",
"query": "={{ \"SELECT sender, sender_name, message, bot_response, created_at FROM conversation_history WHERE room_id = '\" + $json.room_id.replace(/'/g, \"''\") + \"' ORDER BY created_at DESC LIMIT 10\" }}",
"additionalFields": {}
},
"id": "query-memory",
"name": "Query Memory",
"type": "n8n-nodes-base.postgres",
"position": [0, 464],
"typeVersion": 1,
"credentials": {
"postgres": {
"id": "Sy23a0Zvp6PlFt6V",
"name": "Postgres matrix_bot_memory"
}
}
},
{
"parameters": {
"language": "javascript",
"jsCode": "// Get Format Input from $node\nconst formatInput = $node['Format Input'].json;\n\n// Get all items from Query Memory\nconst allItems = $input.all();\nconst memoryItems = allItems.filter(item => item.json && item.json.message && item.json.sender) || [];\n\nlet contextText = '';\n\nif (memoryItems.length > 0) {\n contextText = 'Recent conversation history:\\n\\n';\n // NEWEST FIRST - iterate from 0 to end (last item is most recent)\n for (let i = 0; i < memoryItems.length; i++) {\n const record = memoryItems[i].json || {};\n if (record.message && record.sender && record.bot_response && record.created_at) {\n const ts = new Date(record.created_at).toLocaleString('ru-RU');\n const senderName = record.sender_name || record.sender;\n contextText += `[${ts}] ${senderName}: ${record.message}\\n`;\n contextText += ` → Jarvice: ${record.bot_response}\\n\\n`;\n }\n }\n}\n\n// IMPORTANT: Return ALL fields from Format Input\nreturn {\n context: contextText,\n text: formatInput.text,\n room_id: formatInput.room_id,\n sender: formatInput.sender,\n sender_name: formatInput.sender_name,\n originalMessage: formatInput.originalMessage,\n is_group: formatInput.is_group\n};"
},
"id": "build-context-code",
"name": "Build Context Code",
"type": "n8n-nodes-base.code",
"position": [160, 304],
"typeVersion": 1
},
{
"parameters": {
"promptType": "define",
"text": "={{ $json.context + 'User (' + $json.sender_name + '): ' + $json.text }}",
"options": {
"systemMessage": "Ты полезный AI ассистент для Matrix чата по имени Jarvice🤖.\nТы помощник Администратора (Николай).\nОтвечай кратко, используй emoji.\nОбязательно используй контекст предыдущих сообщений для лучшего ответа. Отвечай с числовыми результатами и референциями на предыдущие вычисления."
}
},
"id": "agent-1",
"name": "AI Agent",
"type": "@n8n/n8n-nodes-langchain.agent",
"position": [304, 304],
"typeVersion": 1.9
},
{
"parameters": {
"model": {
"mode": "list",
"value": "gpt-4o-mini"
},
"options": {},
"builtInTools": {}
},
"id": "openai-1",
"name": "OpenAI Model",
"type": "@n8n/n8n-nodes-langchain.lmChatOpenAi",
"position": [352, 512],
"typeVersion": 1.3,
"credentials": {
"openAiApi": {
"id": "P1rSZeqDWn05zkK1",
"name": "aitunnel"
}
}
},
{
"parameters": {
"language": "javascript",
"jsCode": "const ai = $json.output || $node['AI Agent'].json.output || '';\nconst buildContext = $node['Build Context Code'].json;\n\nconst roomId = buildContext.room_id || '';\nconst isGroup = buildContext.is_group || false;\nconst sender = buildContext.sender || '';\nconst senderName = buildContext.sender_name || '';\nconst originalMessage = buildContext.originalMessage || '';\n\nconst esc = (s) => {\n if (!s) return 'NULL';\n return \"'\" + String(s).replace(/'/g, \"''\") + \"'\";\n};\n\nconst query = \"INSERT INTO conversation_history (room_id, sender, sender_name, message, bot_response) VALUES (\" +\n esc(roomId) + \", \" +\n esc(sender) + \", \" +\n esc(senderName) + \", \" +\n esc(originalMessage) + \", \" +\n esc(ai) + \")\";\n\nreturn {\n message: ai,\n room_id: roomId,\n sender_id: isGroup ? sender : null,\n sender_name: isGroup ? senderName : null,\n query: query\n};"
},
"id": "save-and-send",
"name": "Save and Send",
"type": "n8n-nodes-base.code",
"position": [560, 304],
"typeVersion": 1
},
{
"parameters": {
"method": "POST",
"url": "http://matrix_bot:5000/send_message",
"sendBody": true,
"specifyBody": "json",
"jsonBody": "={{ JSON.stringify({room_id: $json.room_id, message: $json.message, sender_id: $json.sender_id, sender_name: $json.sender_name}) }}",
"options": {}
},
"id": "http-1",
"name": "Send to Matrix",
"type": "n8n-nodes-base.httpRequest",
"position": [752, 208],
"typeVersion": 4.2
},
{
"parameters": {
"operation": "executeQuery",
"query": "={{ $json.query }}",
"additionalFields": {}
},
"id": "save-to-db",
"name": "Save to Database",
"type": "n8n-nodes-base.postgres",
"position": [752, 400],
"typeVersion": 1,
"credentials": {
"postgres": {
"id": "Sy23a0Zvp6PlFt6V",
"name": "Postgres matrix_bot_memory"
}
}
},
{
"parameters": {
"respondWith": "json",
"responseBody": "={{ JSON.stringify({ success: true }) }}",
"options": {}
},
"id": "respond-1",
"name": "Respond to Webhook",
"type": "n8n-nodes-base.respondToWebhook",
"position": [960, 208],
"typeVersion": 1.1
}
],
"connections": {
"Webhook": {
"main": [
[
{
"node": "Format Input",
"type": "main",
"index": 0
}
]
]
},
"AI Agent": {
"main": [
[
{
"node": "Save and Send",
"type": "main",
"index": 0
}
]
]
},
"Format Input": {
"main": [
[
{
"node": "Query Memory",
"type": "main",
"index": 0
},
{
"node": "Build Context Code",
"type": "main",
"index": 1
}
]
]
},
"OpenAI Model": {
"ai_languageModel": [
[
{
"node": "AI Agent",
"type": "ai_languageModel",
"index": 0
}
]
]
},
"Query Memory": {
"main": [
[
{
"node": "Build Context Code",
"type": "main",
"index": 0
}
]
]
},
"Save and Send": {
"main": [
[
{
"node": "Send to Matrix",
"type": "main",
"index": 0
},
{
"node": "Save to Database",
"type": "main",
"index": 0
}
]
]
},
"Send to Matrix": {
"main": [
[
{
"node": "Respond to Webhook",
"type": "main",
"index": 0
}
]
]
},
"Build Context Code": {
"main": [
[
{
"node": "AI Agent",
"type": "main",
"index": 0
}
]
]
}
},
"settings": {
"executionOrder": "v1",
"saveDataErrorExecution": "all",
"saveDataSuccessExecution": "all",
"saveManualExecutions": true,
"saveExecutionProgress": true
}
}
===== Процесс установки ======
==== 1. Подготовка базы данных ====
1. Откройте PgAdmin
2. Выполните SQL команды из раздела "Подготовка базы данных"
3. Проверьте что таблица создана
==== 2. Импорт workflow в n8n ====
1. Скопируйте JSON из раздела "JSON для импорта в n8n"
2. В n8n нажмите "+" > "Import from URL or paste JSON"
3. Вставьте JSON и нажмите "Import"
4. Откройте импортированный workflow
==== 3. Настройка credentials ====
1. Откройте workflow
2. Найдите node "Query Memory" и установите credentials PostgreSQL
3. Найдите node "OpenAI Model" и установите credentials OpenAI API
4. Найдите node "Save to Database" и установите credentials PostgreSQL
==== 4. Проверка и активация ====
1. Нажмите "Test" чтобы проверить webhook URL
2. Скопируйте webhook URL
3. Настройте Matrix сервер на отправку вебхуков на этот URL
4. Активируйте workflow (кнопка "Activate")
===== Примеры использования ======
==== Пример 1: Простой расчет ====
**Пользователь:** Сколько будет 7 + 3?
**Бот:** 7 + 3 = 10 ✨
**Пользователь:** А сколько было в предыдущем вычислении?
**Бот:** В предыдущем вычислении было: 7 + 3 = 10 📝
==== Пример 2: Разговор с контекстом ====
**Пользователь:** Какой сегодня день?
**Бот:** Сегодня День недели, число месяца 📅
**Пользователь:** А вчера какой был?
**Бот:** Вчера был [предыдущий день] 📆
===== Решение проблем ======
==== Бот не отвечает ====
1. Проверьте что workflow активен
2. Проверьте credentials PostgreSQL и OpenAI
3. Проверьте что Matrix сервер отправляет вебхуки на правильный URL
4. Посмотрите Executions в n8n - там будут ошибки
==== Сообщение: column "sender_name" does not exist ====
Выполните SQL команду:
ALTER TABLE conversation_history
ADD COLUMN sender_name VARCHAR(255);
==== Бот забывает историю ====
1. Проверьте что SQL query в "Query Memory" выглядит правильно
2. Проверьте что сообщения сохраняются в БД (Look in Save to Database node output)
3. Проверьте что room_id совпадает в разных сообщениях
==== OpenAI API error ====
1. Проверьте что API key валидный
2. Проверьте что у вас есть баланс в OpenAI
3. Проверьте что используется правильная модель (gpt-4o-mini)
===== Matrix Bot сервис (Python) ======
==== Описание ====
Matrix Bot - это Python сервис, который подключается к Matrix серверу и:
* Получает новые сообщения из чатов
* Отправляет их в n8n webhook для обработки AI
* Получает ответы через HTTP API и отправляет в Matrix
* Поддерживает шифрованные комнаты (Megolm)
* Приветствует новых пользователей в комнате "Общая"
==== Требования ====
* Python 3.9+
* python-nio (Matrix client library)
* aiohttp (async HTTP client)
* python-dotenv (для конфигурации)
==== Установка ====
pip install python-nio aiohttp python-dotenv
==== Конфигурация (.env файл) ====
MATRIX_HOMESERVER=https://syna.digitizepro.tech
MATRIX_USER=@bot:syna.digitizepro.tech
MATRIX_PASSWORD=your_password_here
PICKLE_KEY=your_encryption_key_here
N8N_WEBHOOK_URL=http://n8n:5678/webhook/matrix-bot-ai
N8N_TIMEOUT=30
HTTP_API_PORT=5000
==== Полный код бота ====
**Файл: matrix_bot.py**
import asyncio
import os
import logging
from logging.handlers import RotatingFileHandler
import json
import aiohttp
from pathlib import Path
from aiohttp import web
from nio import (
AsyncClient,
AsyncClientConfig,
RoomMessageText,
MegolmEvent,
InviteMemberEvent,
RoomMemberEvent,
LoginResponse,
KeyVerificationStart,
KeyVerificationCancel,
KeyVerificationKey,
KeyVerificationMac,
LocalProtocolError,
)
# ===== Настройка логирования =====
os.makedirs('/app/logs', exist_ok=True)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler = RotatingFileHandler(
'/app/logs/matrix_bot.log',
maxBytes=50*1024*1024,
backupCount=10,
encoding='utf-8'
)
file_handler.setFormatter(formatter)
file_handler.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
console_handler.setFormatter(formatter)
console_handler.setLevel(logging.INFO)
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(file_handler)
root_logger.addHandler(console_handler)
logger = logging.getLogger("matrix-bot")
# ===== Конфигурация =====
HOMESERVER = os.getenv("MATRIX_HOMESERVER", "https://syna.digitizepro.tech")
USER = os.getenv("MATRIX_USER", "@bot:syna.digitizepro.tech")
PASSWORD = os.getenv("MATRIX_PASSWORD")
STORE_PATH = "/app/store"
PICKLE_KEY = os.getenv("PICKLE_KEY", "default_insecure_key_change_me")
BOT_NICKNAME = "ChatBot"
CREDENTIALS_FILE = os.path.join(STORE_PATH, "credentials.json")
# n8n интеграция
N8N_WEBHOOK_URL = os.getenv("N8N_WEBHOOK_URL", "http://n8n:5678/webhook/matrix-bot-ai")
N8N_TIMEOUT = int(os.getenv("N8N_TIMEOUT", "30"))
# HTTP API для ответов от n8n
HTTP_API_PORT = int(os.getenv("HTTP_API_PORT", "5000"))
# Настройка клиента с шифрованием
config = AsyncClientConfig(
encryption_enabled=True,
pickle_key=PICKLE_KEY,
store_sync_tokens=True
)
# Глобальные переменные
client = None
app = None
http_runner = None
sync_error_count = 0
# ===== НОВОЕ: Функция очистки sync токенов =====
def clear_sync_tokens():
"""Очищает sync токены для полной пересинхронизации после перезагрузки"""
try:
store_path = Path(STORE_PATH)
# Список файлов токенов которые нужно удалить
token_files = [
store_path / "sync_token.json",
store_path / ".matrix-nio-sync-token"
]
for token_file in token_files:
if token_file.exists():
token_file.unlink()
logger.info(f"🗑️ Удалён sync token файл: {token_file}")
# Удаляем базу комнат для полной пересинхронизации
db_file = store_path / "rooms.json"
if db_file.exists():
db_file.unlink()
logger.info(f"🗑️ Удалена база комнат для пересинхронизации")
logger.info("✅ Sync токены очищены, будет выполнена полная пересинхронизация")
except Exception as e:
logger.warning(f"⚠️ Ошибка очистки sync tokens: {e}")
# ===== HTTP API обработчики =====
async def send_message_handler(request):
"""HTTP endpoint для отправки сообщения в Matrix от n8n"""
try:
data = await request.json()
room_id = data.get("room_id")
message = data.get("message")
reply_to_event_id = data.get("reply_to_event_id")
sender_id = data.get("sender_id")
sender_name = data.get("sender_name")
if not room_id or not message:
return web.json_response(
{"error": "room_id and message required"},
status=400
)
logger.info(f"📤 Получен запрос отправить сообщение в комнату {room_id}")
logger.info(f"💬 Текст: {message[:100]}")
if reply_to_event_id:
logger.info(f"↩️ Reply to event: {reply_to_event_id}")
if sender_id:
logger.info(f"👤 Mention user: {sender_name} ({sender_id})")
# Формируем content сообщения
content = {
"msgtype": "m.text",
"body": message
}
# Если есть sender_id, добавляем HTML mention
if sender_id and sender_name:
content["format"] = "org.matrix.custom.html"
content["formatted_body"] = f'{sender_name}: {message}'
# Добавляем reply-to если указан event_id
if reply_to_event_id:
content["m.relates_to"] = {
"m.in_reply_to": {
"event_id": reply_to_event_id
}
}
if "formatted_body" not in content:
content["format"] = "org.matrix.custom.html"
content["formatted_body"] = message
content["formatted_body"] = f'In reply to
{content["formatted_body"]}'
# Отправляем сообщение в Matrix
try:
await client.room_send(
room_id=room_id,
message_type="m.room.message",
content=content
)
logger.info(f"✅ Сообщение отправлено в {room_id}")
return web.json_response({"status": "ok", "room_id": room_id})
except LocalProtocolError as e:
logger.error(f"❌ Ошибка отправки (protocol): {e}")
return web.json_response(
{"error": f"Protocol error: {str(e)}"},
status=500
)
except Exception as e:
logger.error(f"❌ Ошибка отправки: {e}")
return web.json_response(
{"error": f"Send error: {str(e)}"},
status=500
)
except json.JSONDecodeError:
return web.json_response(
{"error": "Invalid JSON"},
status=400
)
except Exception as e:
logger.error(f"❌ Ошибка в handler: {e}")
return web.json_response(
{"error": str(e)},
status=500
)
async def health_handler(request):
"""Health check endpoint"""
is_connected = client is not None and client.logged_in if client else False
return web.json_response({
"status": "ok",
"connected": is_connected,
"timestamp": asyncio.get_event_loop().time()
})
# ===== Функции =====
def save_credentials(device_id, access_token):
"""Сохраняет учётные данные в файл"""
try:
Path(STORE_PATH).mkdir(parents=True, exist_ok=True)
with open(CREDENTIALS_FILE, "w") as f:
json.dump({
"device_id": device_id,
"access_token": access_token
}, f)
logger.info(f"✅ Credentials сохранены в {CREDENTIALS_FILE}")
except Exception as e:
logger.error(f"❌ Ошибка сохранения credentials: {e}")
def load_credentials():
"""Загружает учётные данные из файла"""
try:
if os.path.exists(CREDENTIALS_FILE):
with open(CREDENTIALS_FILE, "r") as f:
creds = json.load(f)
logger.info(f"✅ Credentials загружены: device_id={creds.get('device_id')}")
return creds.get("device_id"), creds.get("access_token")
except Exception as e:
logger.warning(f"Не удалось загрузить credentials: {e}")
return None, None
async def send_to_n8n(text, room_id, sender, is_dm, sender_name=None, event_id=None):
"""Отправляет сообщение в n8n webhook для обработки AI"""
payload = {
"text": text,
"room_id": room_id,
"sender": sender,
"sender_name": sender_name or sender,
"is_dm": is_dm,
"event_id": event_id
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
N8N_WEBHOOK_URL,
json=payload,
timeout=aiohttp.ClientTimeout(total=N8N_TIMEOUT)
) as resp:
if resp.status == 200:
result = await resp.json()
logger.info(f"✅ n8n обработал сообщение")
return result
else:
text_resp = await resp.text()
logger.error(f"❌ n8n вернул ошибку {resp.status}: {text_resp[:200]}")
return None
except asyncio.TimeoutError:
logger.error(f"⏱️ Timeout при обращении к n8n (>{N8N_TIMEOUT}s)")
return None
except Exception as e:
logger.error(f"❌ Ошибка при отправке в n8n: {e}")
return None
async def message_callback(room, event):
"""Обработка текстовых сообщений"""
logger.info(f"📨 Event type: {type(event).__name__}, sender: {event.sender}, room: {room.room_id}")
# Расшифровка Megolm-сообщений
if isinstance(event, MegolmEvent):
logger.info(f"🔐 Decrypting megolm event...")
try:
event = await client.decrypt_event(event)
if not event:
logger.warning(f"Failed to decrypt event in {room.room_id}")
return
logger.info(f"✅ Decrypted successfully")
except Exception as e:
logger.error(f"Decrypt error: {e}")
return
if not isinstance(event, RoomMessageText):
logger.debug(f"Ignoring non-text event: {type(event).__name__}")
return
if event.sender == client.user_id:
logger.debug(f"Ignoring own message")
return
text = (event.body or "").strip()
if not text:
return
logger.info(f"💬 Message text: {text[:100]}")
logger.info(f"🆔 Event ID: {event.event_id}")
# ===== ИСПРАВЛЕНО: Правильная обработка None в member_count =====
member_count = getattr(room, 'member_count', None)
# Если member_count None, пробуем другие способы
if member_count is None or member_count == 0:
member_count = len(room.users) if hasattr(room, 'users') and room.users else 0
if member_count == 0 and hasattr(room, 'joined_count'):
member_count = getattr(room, 'joined_count', 0)
if member_count == 0 and hasattr(room, 'summary'):
member_count = getattr(room.summary, 'joined_member_count', 0)
# ===== КРИТИЧНО: Проверка что member_count не None перед сравнением =====
# Если всё ещё None (после перезагрузки/бекапа), используем default значение
if member_count is None:
logger.warning(f"⚠️ member_count is None (после перезагрузки?), используем default=2 (DM)")
member_count = 2
logger.info(f"🔍 Room info: members={member_count}, name={room.display_name or room.name or 'None'}")
# Определяем тип комнаты
if member_count == 2:
is_dm = True
logger.info(f"🔍 Room type: DM (2 members)")
elif member_count <= 1: # ← Теперь member_count никогда не будет None!
is_dm = True
logger.info(f"🔍 Room type: DM (fallback, members={member_count})")
else:
is_dm = False
logger.info(f"🔍 Room type: GROUP ({member_count} members)")
# Проверяем нужно ли отвечать
if is_dm:
cleaned = text
logger.info(f"💬 DM message, sending to n8n")
else:
# В группе только на обращение
if not text.lower().startswith(f"{BOT_NICKNAME.lower()}:"):
logger.info(f"⏭️ Ignoring group message without mention")
return
cleaned = text[len(f"{BOT_NICKNAME}:"):].strip()
if not cleaned:
return
logger.info(f"💬 Group mention detected, sending to n8n")
# Отправляем сообщение в n8n для обработки AI
logger.info(f"🚀 Sending to n8n webhook...")
# Получаем имя отправителя из комнаты
sender_name = None
if event.sender in room.users:
user = room.users[event.sender]
sender_name = user.name or event.sender
result = await send_to_n8n(cleaned, room.room_id, event.sender, is_dm, sender_name, event.event_id)
if not result:
fallback_reply = "Извините, возникла ошибка при обработке вашего сообщения. Попробуйте позже."
try:
await client.room_send(
room_id=room.room_id,
message_type="m.room.message",
content={"msgtype": "m.text", "body": fallback_reply}
)
logger.warning(f"📤 Отправлен fallback ответ")
except Exception as e:
logger.error(f"❌ Ошибка отправки fallback: {e}")
async def invite_callback(room, event):
"""Обработка приглашений в комнаты"""
try:
await client.join(room.room_id)
logger.info(f"✅ Присоединился к комнате: {room.room_id}")
except Exception as e:
logger.exception(f"❌ Ошибка join: {e}")
async def member_callback(room, event):
"""Обработка событий присоединения/выхода пользователей"""
try:
if event.membership != "join":
return
if event.state_key == client.user_id:
logger.debug(f"Ignoring own membership event")
return
room_name = room.display_name or room.name or ""
logger.info(f"👥 Member event in room: {room_name} ({room.room_id})")
if room_name.lower() != "общая":
logger.debug(f"Skipping welcome message - not 'Общая' room")
return
new_user_id = event.state_key
new_user_name = new_user_id
if new_user_id in room.users:
user = room.users[new_user_id]
new_user_name = user.name or new_user_id
logger.info(f"👋 Sending welcome message to {new_user_name} in {room_name}")
plain_message = f"{new_user_name}: Добро пожаловать в чат! Я AI ChatBot и могу помочь с различными вопросами, если обратиться ко мне (в чате необходимо нажать на мое имя что бы написать мне вопрос). Приватные чаты шифруются. Срок хранения переписки на сервере одна неделя."
html_message = f'{new_user_name}: Добро пожаловать в чат! Я AI ChatBot и могу помочь с различными вопросами, если обратиться ко мне (в чате необходимо нажать на мое имя что бы написать мне вопрос). Приватные чаты шифруются. Срок хранения переписки на сервере одна неделя.'
await client.room_send(
room_id=room.room_id,
message_type="m.room.message",
content={
"msgtype": "m.text",
"body": plain_message,
"format": "org.matrix.custom.html",
"formatted_body": html_message
}
)
logger.info(f"✅ Welcome message sent to {new_user_name}")
except Exception as e:
logger.error(f"❌ Ошибка в member_callback: {e}")
logger.exception(e)
async def to_device_callback(event):
"""Обработка to-device сообщений для обмена ключами"""
logger.debug(f"To-device event: {type(event).__name__}")
# ===== HTTP сервер =====
async def start_http_server():
"""Запускает HTTP API сервер для получения ответов от n8n"""
global app, http_runner
app = web.Application()
app.router.add_post('/send_message', send_message_handler)
app.router.add_get('/health', health_handler)
http_runner = web.AppRunner(app)
await http_runner.setup()
site = web.TCPSite(http_runner, '0.0.0.0', HTTP_API_PORT)
await site.start()
logger.info(f"🌐 HTTP API запущен на порту {HTTP_API_PORT}")
logger.info(f"📡 Endpoints: /send_message (POST), /health (GET)")
# ===== Основной цикл =====
async def main():
global client, sync_error_count
logger.info(f"🚀 Запуск Matrix бота")
logger.info(f"🏠 Homeserver: {HOMESERVER}")
logger.info(f"👤 User: {USER}")
logger.info(f"📡 n8n webhook: {N8N_WEBHOOK_URL}")
# Загружаем сохранённые credentials
device_id, access_token = load_credentials()
# Создаём клиента с сохранёнными данными
if device_id and access_token:
client = AsyncClient(
HOMESERVER,
USER,
config=config,
store_path=STORE_PATH,
device_id=device_id
)
client.access_token = access_token
client.user_id = USER
logger.info(f"✅ Используем существующую сессию, device_id: {device_id}")
else:
client = AsyncClient(HOMESERVER, USER, config=config, store_path=STORE_PATH)
client.user_id = USER
logger.info("Выполняется первичный логин...")
if not PASSWORD:
logger.error("❌ MATRIX_PASSWORD не установлен в .env")
return
login_resp = await client.login(PASSWORD, device_name="matrix-bot")
if not isinstance(login_resp, LoginResponse):
logger.error(f"❌ Login failed: {login_resp}")
return
logger.info(f"✅ Залогинились как {USER}, device_id: {client.device_id}")
save_credentials(client.device_id, client.access_token)
# Загрузка crypto store
try:
client.load_store()
logger.info("✅ Crypto store загружен")
except Exception as e:
logger.warning(f"Crypto store не загружен: {e}")
# ===== НОВОЕ: Первичная синхронизация с обработкой next_batch ошибок =====
logger.info("⏳ Выполняется первичная синхронизация...")
max_sync_retries = 3
for sync_attempt in range(max_sync_retries):
try:
sync_response = await client.sync(timeout=30000, full_state=True)
logger.info(f"✅ Первичная синхронизация завершена")
sync_error_count = 0
break
except Exception as e:
error_str = str(e)
sync_attempt_num = sync_attempt + 1
# Проверяем на ошибку next_batch
if "next_batch" in error_str or "'next_batch' is a required property" in error_str:
logger.error(f"❌ Sync error (попытка {sync_attempt_num}/{max_sync_retries}): next_batch missing")
logger.warning(f"🔄 Это происходит после перезагрузки - Matrix сервер вернул некорректный ответ")
if sync_attempt_num < max_sync_retries:
logger.info(f"🗑️ Очищаю sync токены и пробую ещё раз...")
clear_sync_tokens()
await asyncio.sleep(5) # Дождёмся перед следующей попыткой
else:
logger.error(f"❌ Не удалось выполнить синхронизацию после {max_sync_retries} попыток")
logger.info("💡 Попробуем продолжить и восстановиться в основном цикле...")
break
else:
logger.error(f"❌ Ошибка синхронизации (попытка {sync_attempt_num}/{max_sync_retries}): {e}")
if sync_attempt_num < max_sync_retries:
await asyncio.sleep(5)
else:
logger.error(f"❌ Не удалось выполнить синхронизацию после {max_sync_retries} попыток")
logger.info("💡 Попробуем продолжить и восстановиться в основном цикле...")
break
# Загрузка ключей шифрования при необходимости
if client.should_upload_keys:
logger.info("📤 Загрузка ключей шифрования...")
try:
await client.keys_upload()
logger.info("✅ Ключи шифрования загружены")
except Exception as e:
logger.warning(f"⚠️ Ошибка загрузки ключей: {e}")
# Запрос ключей для шифрованных комнат
logger.info("🔑 Запрос ключей для комнат...")
try:
await client.keys_query()
shared_count = 0
for room_id, room in client.rooms.items():
if room.encrypted:
try:
await client.share_group_session(room_id, ignore_unverified_devices=True)
shared_count += 1
except Exception as e:
logger.warning(f"⚠️ Share session error for {room_id}: {e}")
if shared_count > 0:
logger.info(f"🔐 Ключи общей сессии переданы для {shared_count} шифрованных комнат")
except Exception as e:
if "No key query required" not in str(e):
logger.warning(f"⚠️ Keys query error: {e}")
# Регистрируем callbacks для новых событий
client.add_event_callback(message_callback, RoomMessageText)
client.add_event_callback(message_callback, MegolmEvent)
client.add_event_callback(invite_callback, InviteMemberEvent)
client.add_event_callback(member_callback, RoomMemberEvent)
client.add_to_device_callback(to_device_callback,
(KeyVerificationStart,
KeyVerificationCancel,
KeyVerificationKey,
KeyVerificationMac))
# Запускаем HTTP сервер
await start_http_server()
logger.info(f"🤖 Бот полностью готов к работе!")
logger.info(f"✅ Listening for messages...")
try:
# ===== НОВОЕ: Основной цикл с обработкой ошибок next_batch =====
consecutive_errors = 0
while True:
try:
await client.sync(timeout=30000)
consecutive_errors = 0 # Reset при успехе
except Exception as e:
error_str = str(e)
consecutive_errors += 1
# Проверяем на ошибку next_batch
if "next_batch" in error_str or "'next_batch' is a required property" in error_str:
logger.warning(f"⚠️ Sync error #{consecutive_errors}: next_batch missing (обычно после перезагрузки)")
if consecutive_errors == 1:
logger.info("🗑️ Первая ошибка - очищаю sync токены...")
clear_sync_tokens()
# Экспоненциальная задержка: 5s, 10s, 20s, 40s...
delay = min(5 * (2 ** (consecutive_errors - 1)), 120)
logger.info(f"⏳ Жду {delay}s перед повторной попыткой...")
await asyncio.sleep(delay)
if consecutive_errors > 5:
logger.error("❌ Слишком много consecutive ошибок (>5), контейнер перезагружается...")
break
else:
# Другие ошибки
logger.error(f"❌ Ошибка sync #{consecutive_errors}: {e}")
if consecutive_errors > 3:
logger.warning(f"⚠️ {consecutive_errors} ошибок подряд, даю больше времени...")
await asyncio.sleep(30)
else:
await asyncio.sleep(10)
except KeyboardInterrupt:
logger.info("⏹️ Получен сигнал завершения")
finally:
if http_runner:
await http_runner.cleanup()
if client:
await client.close()
logger.info("👋 Бот остановлен")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Bot остановлен пользователем")
except Exception as e:
logger.exception(f"❌ Fatal error: {e}")
==== Запуск бота ====
# Создаём .env файл с конфигурацией
cat > .env << EOF
MATRIX_HOMESERVER=https://syna.digitizepro.tech
MATRIX_USER=@bot:syna.digitizepro.tech
MATRIX_PASSWORD=your_password
PICKLE_KEY=your_encryption_key
N8N_WEBHOOK_URL=http://n8n:5678/webhook/matrix-bot-ai
N8N_TIMEOUT=30
HTTP_API_PORT=5000
EOF
# Запускаем бот
python matrix_bot.py
==== Использование с Docker ====
**requirements.txt:**
python-nio>=0.24.0
aiohttp>=3.8.0
python-dotenv>=0.21.0
**Dockerfile:**
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY matrix_bot.py .
CMD ["python", "matrix_bot.py"]
==== Основные функции ====
==== Message Callback ====
Обрабатывает текстовые сообщения:
* Расшифровывает Megolm-сообщения (из шифрованных комнат)
* Определяет тип комнаты (DM или GROUP)
* В DM отвечает на все сообщения
* В GROUP отвечает только на обращение (''ChatBot: вопрос'')
* Отправляет в n8n webhook
==== Send to n8n ====
Отправляет сообщение в n8n:
* text - очищенный текст
* room_id - ID комнаты
* sender - Matrix ID отправителя
* sender_name - Имя отправителя
* is_dm - флаг типа комнаты
* event_id - ID события
==== Send Message Handler ====
HTTP endpoint для ответов от n8n:
* Получает сообщение и параметры
* Добавляет HTML mention если есть sender_id
* Отправляет в Matrix комнату
===== Технические детали ======
==== Порядок сообщений в контексте ====
Query Memory возвращает сообщения в порядке DESC (новые сверху), но Build Context Code переворачивает их так чтобы **новые сообщения были в конце контекста**. Это важно чтобы AI правильно понял что является "последним" сообщением.
==== Экранирование SQL ====
В Save and Send используется функция esc() которая экранирует одинарные кавычки ('' → '''') чтобы избежать SQL инъекций.
==== Fallback значения ====
Если sender_name не пришел, используется sender ID
Если сообщение не пришло, используется пустая строка
Все это обрабатывается в Build Context Code и Save and Send
===== Дополнительная информация ======
**Версия Workflow:** 236
**Последнее обновление:** 2025-11-29
**Статус:** Production Ready ✅
**Contacts:** Если возникли вопросы по workflow, смотрите логи в Executions