Conversation Persistence
I guess the simpliest way to implement it is looking at PicklePersistence()
.
The only example I've seen of the dictionary is conversations = { name : { (user_id,user_id): state} }
where name
is the one given to ConversationHandler()
, the tuple-as-a-key (user_id,user_id)
is the user_id
to whom your bot is talking to and state
is the state of the conversation. Ok, maybe one isn't user_id
, maybe is chat_id
but I can't say for sure, I need more guinea pigs.
To handle the tuple-as-a-key, python-telegram-bot includes some tools to help you handle that: encode_conversations_to_json
and decode_conversations_from_json
.
Here, on_flush
is a variable to tell the code if you want to save everytime there is a call to update_conversation()
when is set to False
or only when exiting the program when is set to True
One last detail: for now the following code only saves and retrieve from the database but there is no replacing nor deleting.
from telegram.ext import BasePersistence
from config import mongo_URI
from copy import deepcopy
from telegram.utils.helpers import decode_conversations_from_json, encode_conversations_to_json
import mongoengine
import json
from bson import json_util
class Conversations(mongoengine.Document):
obj = mongoengine.DictField()
meta = { 'collection': 'Conversations', 'ordering': ['-id']}
class MongoPersistence(BasePersistence):
def __init__(self):
super(MongoPersistence, self).__init__(store_user_data=False,
store_chat_data=False,
store_bot_data=False)
dbname = "persistencedb"
mongoengine.connect(host=mongo_URI, db=dbname)
self.conversation_collection = "Conversations"
self.conversations = None
self.on_flush = False
def get_conversations(self, name):
if self.conversations:
pass
else:
document = Conversations.objects()
if document.first() == None:
document = {}
else:
document = document.first()['obj']
conversations_json = json_util.dumps(document)
self.conversations = decode_conversations_from_json(conversations_json)
return self.conversations.get(name, {}).copy()
def update_conversation(self, name, key, new_state):
if self.conversations.setdefault(name, {}).get(key) == new_state:
return
self.conversations[name][key] = new_state
if not self.on_flush:
conversations_dic = json_util.loads(encode_conversations_to_json(self.conversations))
document = Conversations(obj=conversations_dic)
document.save()
def flush(self):
conversations_dic = json_util.loads(encode_conversations_to_json(self.conversations))
document = Conversations(obj=conversations_dic)
document.save()
mongoengine.disconnect()
BEWARE! Sometimes the conversations requires to user previosly setted user_data
and this code doesn't provide it as requested.
All Persistence
Here is a more complete code (still lack the replace document in the database).
from telegram.ext import BasePersistence
from collections import defaultdict
from config import mongo_URI
from copy import deepcopy
from telegram.utils.helpers import decode_user_chat_data_from_json, decode_conversations_from_json, encode_conversations_to_json
import mongoengine
import json
from bson import json_util
class Conversations(mongoengine.Document):
obj = mongoengine.DictField()
meta = { 'collection': 'Conversations', 'ordering': ['-id']}
class UserData(mongoengine.Document):
obj = mongoengine.DictField()
meta = { 'collection': 'UserData', 'ordering': ['-id']}
class ChatData(mongoengine.Document):
obj = mongoengine.DictField()
meta = { 'collection': 'ChatData', 'ordering': ['-id']}
class BotData(mongoengine.Document):
obj = mongoengine.DictField()
meta = { 'collection': 'BotData', 'ordering': ['-id']}
class DBHelper():
"""Class to add and get documents from a mongo database using mongoengine
"""
def __init__(self, dbname="persistencedb"):
mongoengine.connect(host=mongo_URI, db=dbname)
def add_item(self, data, collection):
if collection == "Conversations":
document = Conversations(obj=data)
elif collection == "UserData":
document = UserData(obj=data)
elif collection == "chat_data_collection":
document = ChatData(obj=data)
else:
document = BotData(obj=data)
document.save()
def get_item(self, collection):
if collection == "Conversations":
document = Conversations.objects()
elif collection == "UserData":
document = UserData.objects()
elif collection == "ChatData":
document = ChatData.objects()
else:
document = BotData.objects()
if document.first() == None:
document = {}
else:
document = document.first()['obj']
return document
def close(self):
mongoengine.disconnect()
class DBPersistence(BasePersistence):
"""Uses DBHelper to make the bot persistant on a database.
It's heavily inspired on PicklePersistence from python-telegram-bot
"""
def __init__(self):
super(DBPersistence, self).__init__(store_user_data=True,
store_chat_data=True,
store_bot_data=True)
self.persistdb = "persistancedb"
self.conversation_collection = "Conversations"
self.user_data_collection = "UserData"
self.chat_data_collection = "ChatData"
self.bot_data_collection = "BotData"
self.db = DBHelper()
self.user_data = None
self.chat_data = None
self.bot_data = None
self.conversations = None
self.on_flush = False
def get_conversations(self, name):
if self.conversations:
pass
else:
conversations_json = json_util.dumps(self.db.get_item(self.conversation_collection))
self.conversations = decode_conversations_from_json(conversations_json)
return self.conversations.get(name, {}).copy()
def update_conversation(self, name, key, new_state):
if self.conversations.setdefault(name, {}).get(key) == new_state:
return
self.conversations[name][key] = new_state
if not self.on_flush:
conversations_json = json_util.loads(encode_conversations_to_json(self.conversations))
self.db.add_item(conversations_json, self.conversation_collection)
def get_user_data(self):
if self.user_data:
pass
else:
user_data_json = json_util.dumps(self.db.get_item(self.user_data_collection))
if user_data_json != '{}':
self.user_data = decode_user_chat_data_from_json(user_data_json)
else:
self.user_data = defaultdict(dict,{})
return deepcopy(self.user_data)
def update_user_data(self, user_id, data):
if self.user_data is None:
self.user_data = defaultdict(dict)
# comment next line if you want to save to db every time this function is called
if self.user_data.get(user_id) == data:
return
self.user_data[user_id] = data
if not self.on_flush:
user_data_json = json_util.loads(json.dumps(self.user_data))
self.db.add_item(user_data_json, self.user_data_collection)
def get_chat_data(self):
if self.chat_data:
pass
else:
chat_data_json = json_util.dumps(self.db.get_item(self.chat_data_collection))
if chat_data_json != "{}":
self.chat_data = decode_user_chat_data_from_json(chat_data_json)
else:
self.chat_data = defaultdict(dict,{})
return deepcopy(self.chat_data)
def update_chat_data(self, chat_id, data):
if self.chat_data is None:
self.chat_data = defaultdict(dict)
# comment next line if you want to save to db every time this function is called
if self.chat_data.get(chat_id) == data:
return
self.chat_data[chat_id] = data
if not self.on_flush:
chat_data_json = json_util.loads(json.dumps(self.chat_data))
self.db.add_item(chat_data_json, self.chat_data_collection)
def get_bot_data(self):
if self.bot_data:
pass
else:
bot_data_json = json_util.dumps(self.db.get_item(self.bot_data_collection))
self.bot_data = json.loads(bot_data_json)
return deepcopy(self.bot_data)
def update_bot_data(self, data):
if self.bot_data == data:
return
self.bot_data = data.copy()
if not self.on_flush:
bot_data_json = json_util.loads(json.dumps(self.bot_data))
self.db.add_item(self.bot_data, self.bot_data_collection)
def flush(self):
if self.conversations:
conversations_json = json_util.loads(encode_conversations_to_json(self.conversations))
self.db.add_item(conversations_json, self.conversation_collection)
if self.user_data:
user_data_json = json_util.loads(json.dumps(self.user_data))
self.db.add_item(user_data_json, self.user_data_collection)
if self.chat_data:
chat_data_json = json_util.loads(json.dumps(self.chat_data))
self.db.add_item(chat_data_json, self.chat_data_collection)
if self.bot_data:
bot_data_json = json_util.loads(json.dumps(self.bot_data))
self.db.add_item(self.bot_data, self.bot_data_collection)
self.db.close()
Two details:
- Chat_data persistence hasn't been saving to the database for now. Needs more testing. Maybe that part of the code has a bug.
- For now the only part of the code where
on_flush = False
works is in Conversations. In all other updates it seems the call is done after the assignment so if variable[key] == data
is always True
and finish the code earlier than saving to database, that's why there is a comment saying # comment next line if you want to save to db every time this function is called
but makes a lot of savings. If you set on_flush = True
and the code stops earlier (the process is killed for example) you won't save anything on the database.