I needed this for a slack related project to feed channel history to gemini and came up with this:
channel_history = get_channel_messages(
slack_client, channel_id, oldest=search_timestamp
)
thread_history = []
for history_message in channel_history:
if "thread_ts" in history_message:
thread_messages = get_message_thread(
slack_client,
channel_id=channel_id,
thread_ts=history_message["thread_ts"],
)
thread_history.extend(thread_messages)
# add the threads
channel_history.extend(thread_history)
# sort it all by ts
sorted_history = sorted(channel_history, key=itemgetter("ts"))
where the get_channel_messages and get_message_thread are thus:
def get_channel_messages(slack_client, channel_id, message_count=0, oldest=1):
# return conversation history
conversation_history = []
try:
# Call the conversations.history method using the WebClient
# conversations.history returns the first 100 messages by default
# paginated, get the first page
result = slack_client.conversations_history(
channel=channel_id, limit=message_count, oldest=oldest
)
conversation_history = result["messages"] if "messages" in result else []
while getValueByPath(result, "result.response_metadata.next_cursor"):
logger.debug(
f"CURSOR: {getValueByPath(result,'result.response_metadata.next_cursor')}"
)
result = slack_client.conversations_history(
channel=channel_id,
limit=message_count,
cursor=result["response_metadata"]["next_cursor"],
latest=result["messages"][1]["ts"],
)
conversation_history.extend(result["messages"])
# results
logger.debug(
"{} messages found in {}".format(len(conversation_history), channel_id)
)
return conversation_history
except SlackApiError as e:
logger.error("Error accessing history: {}".format(e))
def get_message_thread(slack_client, channel_id, thread_ts):
thread_history = []
try:
# Call the conversations.replies method using the WebClient
# conversations.history returns the first 100 messages by default
# paginated, get the first page
result = slack_client.conversations_replies(channel=channel_id, ts=thread_ts)
thread_history = result["messages"] if "messages" in result else []
while getValueByPath(result, "result.response_metadata.next_cursor"):
logger.debug(
f"CURSOR: {getValueByPath(result,'result.response_metadata.next_cursor')}"
)
result = slack_client.conversations_replies(
channel=channel_id,
ts=thread_ts,
cursor=result["response_metadata"]["next_cursor"],
latest=result["messages"][1]["ts"],
)
thread_history.extend(result["messages"])
# results
logger.debug("{} messages found in {}".format(len(thread_history), thread_ts))
return thread_history
except SlackApiError as e:
logger.error("Error accessing history: {}".format(e))
You can see it in context in this open source project: https://github.com/jeffbryner/gcp-ai-slackbot/blob/main/code/source/main.py#L157