Scriptone Scriptone

自身のBlueskyのポストでブロードリスニングをする(Python)

概要

本記事では、Blueskyに自身が投稿したテキストデータを活用し、Pythonと生成AI、クラスタリング技術を用いて「ブロードリスニング」を行う方法を解説します。ブロードリスニングとは、大勢の声や多様な意見を効率的に集約・整理し、全体像や傾向を把握するための手法です。個人利用に特化した本記事では、著作権や利用規約に配慮しつつ、自分の投稿データだけを対象に分析を進めます。

ブロードリスニングとは

ブロードリスニングとは多くの人々の多様な声をもれなく聴くための手法の総称です。今まではブロードキャストでメディアやSNSを通じて企業や政治家、タレント、インフルエンサーが大勢に対して情報を発信してきました。反対に大勢の声を集約して整理整頓した形で見られる効率的な方法がなく、人の手で整理整頓をしてレポートにまとめる必要がありました。生成AIの登場により大勢の声を分析しやすいように前処理して、分類し、分類したテキストにラベル付をして、要約をするなどの一連の手順が効率的にできるようになり、膨大なテキストから全体や細部を見通せるようになりました。

国内においては東京都から「2050年代の東京どうなっていてほしい?」というテーマでブロードリスニングの分析結果やコードとプロンプトが公開されています。大衆から多様な声が集まって全体を俯瞰して見られる様子についてはぜひ東京都の事例をご覧いただくと良いかと思います。

今回おこなう分析

本記事ではBlueskyで自身が投稿しているテキストを使って、過去にどのようなことをやってきているのかの視点で前処理やクラスタリングを行います。仕事などであれば1次データや2次データなどアンケートのデータを使って、特定の目的で分析するなどもできるかもしれません。しかし、個人で同様に行おうとしたときに著作権の問題でテキストが扱いづらかったり再公開できなかったりなどの問題があります。しかしながら自身が投稿したBlueskyのポストであれば著作権の問題もなく加工して再公開することは問題ありません。ただし、2025年5月末時点の情報ですので、実際にBlueskyのポストを使ってブロードリスニングを行う場合にはBlueskyの利用規約に目を通していただきますようお願いいたします。

リポジトリ

リポジトリはGitHubで管理しています。README.mdにしたがってクローンをしてライブラリ等を導入してください。クラスタリング用のライブラリのために別途環境構築が必要となる可能性がありますので、uv syncでライブラリの導入に失敗する場合にはPerplexityなどで環境構築の調査をしてみてください。おそらくニッチな情報ではないので環境構築の手助けになるかと思います。 URL: https://github.com/rmc8/bsky_listening

コードの解説

ブロードリスニングのプログラムはGoogleのfireライブラリを使用してCLIで実行するように組まれています。実行方法はリポジトリのREADME.mdに記載がありますがuv run main.py allのようにコマンドを設定して実行します。xAIやBlueskyなど外部のAPIを使用するので、APIキーやアプリパスなどを事前に.evnやPCの環境変数に設定してください。CLIで実行する処理の大まかな流れは以下のとおりです。

  • main.py: 下記の機能をCLIで操作させる機能を提供する
    • fetch: Blueskyから自身のポストを取得する
    • preproc: ポストをブロードリスニングが行いやすい形に整形して、複数のトピックが含まれる場合1つずつになるように分割する
    • embedding: 前処理したテキストをベクトル化する(OllamaおよびOpenAIの任意のモデルで対応可能)
    • clustering: ベクトル化したデータをクラスタリングしX,Y座標とクラスタ番号を表示する
    • labeling: クラスタごとにクラスタの特徴を記した簡単なラベルを作る
    • chart: preproc, clustering, labelingのデータを使って散布図を作る
    • all: 上記のfetchからchartまでの一連の処理をすべて実行する

main.py

main.pyはブロードリスニングで必要な各ステップをCLIで操作できるようにしています。処理内容はBskyListeningクラスに集約されており、メソッドをCLIで実行することで各ステップを実行できるようにしています。設定ファイルはtoml形式で記述し、APIキーなどのシークレットな値は.envや環境変数で読み取るようにします。

import os
import tomllib
from datetime import datetime
from pathlib import Path
from typing import Any

import fire
import pandas as pd
from dotenv import load_dotenv


from libs import bsky, chart, preproc, embedding, clustering, labeling
from libs.file_name import FileName

load_dotenv()

now = datetime.now()
pj_name = f"{now:%Y%m%d}"


THIS_DIR = Path(__file__).parent
PROJECT_DIR = THIS_DIR / "project"


def get_config() -> dict[str, Any]:
    config_file = THIS_DIR / "config.toml"
    with config_file.open("rb") as b:
        return tomllib.load(b)


class BskyListening:
    """
    Bluesky ソーシャルメディアプラットフォームから特定のユーザーの投稿を取得し、
    その内容を分析・可視化するための一連の処理を提供するクラスです。
    """

    @staticmethod
    def fetch(pj_name: str = pj_name, limit: int = 500):
        """
        指定されたBlueskyアカウントから投稿データを取得し、TSV形式で保存します。
        リポストは除外されます。

        Args:
            pj_name (str): プロジェクト名。デフォルトは現在日付 (YYYYMMDD)。
            limit (int): 取得する投稿の最大件数。デフォルトは500。
        """
        df = bsky.fetch(
            config=get_config(), app_pass=str(os.getenv("BSKY_APP_PASS")), limit=limit
        )
        io_dir = PROJECT_DIR.joinpath(pj_name)
        os.makedirs(io_dir, exist_ok=True)
        output_path = io_dir.joinpath(FileName.bsky_posts.value)
        df.to_csv(output_path, sep="\t", index=False)

    @staticmethod
    def preproc(pj_name: str = pj_name):
        """
        取得した投稿テキストから主要なトピックを抽出し、分析しやすいように整形します。

        Args:
            pj_name (str): プロジェクト名。デフォルトは現在日付 (YYYYMMDD)。
        """
        io_dir = PROJECT_DIR.joinpath(pj_name)
        input_path = io_dir.joinpath(FileName.bsky_posts.value)
        idf = pd.read_csv(input_path, sep="\t")
        odf = preproc.preproc(
            config=get_config(),
            api_key=str(os.getenv("XAI_API_KEY")),
            idf=idf,
        )
        output_path = io_dir.joinpath(FileName.preproc.value)
        odf.to_csv(output_path, sep="\t", index=False)

    @staticmethod
    def _embedding_by_openai(idf: pd.DataFrame) -> pd.DataFrame:
        """
        OpenAIモデルを使用して埋め込みを生成します。

        Args:
            idf (pd.DataFrame): 前処理されたトピックデータを含むDataFrame。

        Returns:
            pd.DataFrame: 埋め込みデータを含むDataFrame。
        """
        return embedding.embed_by_openai(
            config=get_config(),
            api_key=str(os.getenv("OPENAI_API_KEY")),
            idf=idf,
        )

    @staticmethod
    def _embedding_by_ollama(idf: pd.DataFrame) -> pd.DataFrame:
        """
        Ollamaモデルを使用して埋め込みを生成します。

        Args:
            idf (pd.DataFrame): 前処理されたトピックデータを含むDataFrame。

        Returns:
            pd.DataFrame: 埋め込みデータを含むDataFrame。
        """
        return embedding.embed_by_ollama(
            config=get_config(),
            idf=idf,
        )

    @classmethod
    def embedding(cls, pj_name: str = pj_name, is_local: bool = True):
        """
        前処理されたトピックデータから、機械学習モデル(OpenAIまたはOllama)を使用して
        数値ベクトル(埋め込み)を生成します。

        Args:
            pj_name (str): プロジェクト名。デフォルトは現在日付 (YYYYMMDD)。
            is_local (bool): ローカルのOllamaモデルを使用するかどうか。デフォルトはTrue。
                             Falseの場合、OpenAIモデルを使用します。
        """
        io_dir = PROJECT_DIR.joinpath(pj_name)
        input_path = io_dir.joinpath(FileName.preproc.value)
        idf = pd.read_csv(input_path, sep="\t")
        embedding_func = (
            cls._embedding_by_ollama if is_local else cls._embedding_by_openai
        )
        odf = embedding_func(idf)
        output_path = io_dir.joinpath(FileName.embedding.value)
        odf.to_pickle(output_path)

    @staticmethod
    def clustering(pj_name: str = pj_name):
        """
        生成された埋め込みデータを用いて、投稿を類似性に基づいてクラスタリングします。
        UMAP、HDBSCAN、BERTopic、Spectral Clusteringを組み合わせて使用します。

        Args:
            pj_name (str): プロジェクト名。デフォルトは現在日付 (YYYYMMDD)。
        """
        io_dir = PROJECT_DIR.joinpath(pj_name)
        preproc_path = io_dir.joinpath(FileName.preproc.value)
        embedding_path = io_dir.joinpath(FileName.embedding.value)
        idf = pd.read_csv(preproc_path, sep="\t")
        edf = pd.read_pickle(embedding_path)
        odf = clustering.clustering(
            config=get_config(),
            idf=idf,
            edf=edf,
        )
        output_path = io_dir.joinpath(FileName.clustering.value)
        odf.to_csv(output_path, sep="\t", index=False)

    @staticmethod
    def labeling(pj_name: str = pj_name, threshold: float = 0.70):
        """
        クラスタリングされた各グループに対して、その特徴をもっともよく表すラベルを生成します。

        Args:
            pj_name (str): プロジェクト名。デフォルトは現在日付 (YYYYMMDD)。
            threshold (float): ラベル生成に使用するデータの確率閾値。デフォルトは0.70。
                               この値以上の確率を持つデータのみがラベル生成に使用されます。
        """
        io_dir = PROJECT_DIR.joinpath(pj_name)
        clustering_path = io_dir.joinpath(FileName.clustering.value)
        preproc_path = io_dir.joinpath(FileName.preproc.value)
        pdf = pd.read_csv(preproc_path, sep="\t")
        cdf = pd.read_csv(clustering_path, sep="\t")
        idf = pd.merge(pdf, cdf, on=["index"], how="left")
        odf = labeling.labeling(
            config=get_config(),
            api_key=str(os.getenv("XAI_API_KEY")),
            idf=idf,
            threshold=threshold,
        )
        output_path = io_dir.joinpath(FileName.labeling.value)
        odf.to_csv(output_path, sep="\t", index=False)

    @staticmethod
    def chart(pj_name: str = pj_name, full_html: bool = True):
        """
        クラスタリングとラベリングの結果を基に、インタラクティブな散布図(HTML形式)を生成します。

        Args:
            pj_name (str): プロジェクト名。デフォルトは現在日付 (YYYYMMDD)。
            full_html (bool): チャートを完全なHTMLとして出力するかどうか。デフォルトはTrue。
        """
        io_dir = PROJECT_DIR.joinpath(pj_name)
        clustering_path = io_dir.joinpath(FileName.clustering.value)
        preproc_path = io_dir.joinpath(FileName.preproc.value)
        pdf = pd.read_csv(preproc_path, sep="\t")
        cdf = pd.read_csv(clustering_path, sep="\t")
        idf = pd.merge(pdf, cdf, on=["index"], how="left")
        labeling_path = io_dir.joinpath(FileName.labeling.value)
        ldf = pd.read_csv(labeling_path, sep="\t")
        html = chart.chart(config=get_config(), pdf=idf, ldf=ldf, full_html=full_html)
        output_path = io_dir.joinpath(FileName.chart.value)
        with open(output_path, "w") as f:
            f.write(html)

    @classmethod
    def all(
        cls,
        pj_name: str = pj_name,
        limit: int = 500,
        is_local: bool = True,
        threshold: float = 0.70,
        full_html: bool = True,
    ):
        """
        Blueskyの投稿取得からチャート作成までの一連の処理をすべて実行します。

        Args:
            pj_name (str): プロジェクト名。デフォルトは現在日付 (YYYYMMDD)。
            limit (int): 取得する投稿の最大件数。デフォルトは500。
            is_local (bool): 埋め込み生成でローカルのOllamaモデルを使用するかどうか。
                             デフォルトはTrue。Falseの場合、OpenAIモデルを使用します。
            threshold (float): ラベル生成に使用するデータの確率閾値。デフォルトは0.70。
                               この値以上の確率を持つデータのみがラベル生成に使用されます。
            full_html (bool): チャートを完全なHTMLとして出力するかどうか。デフォルトはTrue。
        """
        cls.fetch(pj_name=pj_name, limit=limit)
        cls.preproc(pj_name=pj_name)
        cls.embedding(pj_name=pj_name, is_local=is_local)
        cls.clustering(pj_name=pj_name)
        cls.labeling(pj_name=pj_name, threshold=threshold)
        cls.chart(pj_name=pj_name, full_html=full_html)


def main():
    fire.Fire(BskyListening)


if __name__ == "__main__":
    main()

fetch

fetchでは分析対象となるBlueskyのポストを取得します。tomlにBlueskyのアカウントのハンドルを記述して、アプリパスワードを環境変数に設定することで指定のハンドルのポストをlimitを迎えるかすべてのポストを取得するかするまで、取得の操作を続けます。取得したデータはuri, cid, text, create_atの4列構成で出力します。

import logging
from time import sleep
from typing import Any


from atproto import Client
from pandas import DataFrame
from retry import retry


logger = logging.getLogger(__name__)


@retry(tries=5, delay=6.0)
def _get_timeline(c: Client, actor: str, cursor: str | None):
    res = c.get_author_feed(actor=actor, cursor=cursor, limit=100)
    return res


def fetch(config: dict[str, Any], app_pass: str, limit: int) -> DataFrame:
    logging.basicConfig(level=logging.INFO)
    handle = config["bluesky"]["handle"]
    c = Client()
    c.login(login=handle, password=app_pass)
    posts: list[dict[str, Any]] = []
    cursor: str | None = None
    while True:
        res = _get_timeline(c, handle, cursor)
        for feed_view in res.feed:
            post = feed_view.post
            if post.viewer is not None and post.viewer.repost is not None:
                continue
            posts.append(
                {
                    "uri": post.uri,
                    "cid": post.cid,
                    "text": post.record.text,
                    "created_at": post.record.created_at,
                }
            )
        cursor = post.record.created_at
        if res.feed:
            cursor = res.feed[-1].post.record.created_at
            logger.info("cursor: %s, len: %s", cursor, len(posts))
            continue
        if res.cursor is None or len(posts) >= limit or len(res.feed) == 0:
            break
        sleep(1.0)
    return DataFrame(posts)

ここの処理が一番、費用と時間がかかると思います。必要に応じて低価格(もしくは高速)なモデルにしたり、東京都の事例のように複数のスレッド(2~4個ぐらい)を立てて同時に処理させるなどすると費用削減や高速化ができます。

preproc

取得したポストを分析しやすいように整理整頓します。ポストを一つずつ読み込み、分析の目的に沿ってテキストを加工します。

import logging
import pandas as pd


from langchain.prompts import ChatPromptTemplate
from langchain_xai import ChatXAI
from pydantic import BaseModel, Field, SecretStr
from tqdm import tqdm


class TopicData(BaseModel):
    topics: list[str] = Field(
        ...,
        description="ユーザーのポストから抽出された主要な活動・行動トピックのリスト。複数の行動が含まれる場合は分割されます。",
    )


def _get_topics(config: dict, api_key: str, post: str):
    llm = ChatXAI(
        model=config["preproc"]["model"],
        api_key=SecretStr(api_key),
        temperature=0.0,
    )
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                config["preproc"]["system_prompt"],
            ),
            (
                "human",
                post.replace("{", "{{").replace("}", "}}"),
            ),
        ]
    )
    chain = prompt | llm.with_structured_output(TopicData)
    res: TopicData = chain.invoke({})  # type: ignore
    return res.topics


def preproc(config: dict, api_key: str, idf: pd.DataFrame):
    logging.basicConfig(level=logging.WARNING)
    data = []
    index = 0
    idf = idf.dropna(subset=["text"])
    for rec in tqdm(idf.to_dict("records")):
        topics = _get_topics(config, api_key, rec["text"])
        for topic in topics:
            data.append(
                {
                    "index": index,
                    "cid": rec["cid"],
                    "topic": topic,
                }
            )
            index += 1
    return pd.DataFrame(data)

分析に置いてはxAIのモデルを使用しています。xAIは学習データとしてチャットを提供することで毎月$150の無料クレジットが提供されますのでとりあえずブロードリスニングを試す用途であれば費用を気にすることなく試せます。提供をしないことを選択すれば自費になりますが学習に使われることがなくAPIを活用できます。

モデルに渡すシステムプロンプトは下記のとおりになります。



[preproc]
model = "grok-3-fast-beta"
system_prompt = """
あなたはデータアナリストです。Blueskyのユーザーのポストをユーザーから受け取り、ブロードリスニングがしやすい用にテキストを加工する役割を持ちます。
今回はユーザーがどんなことをやっていたのかに着目してトピックを抽出したいです。やったことが複数含まれる場合にはやったことを1つずつに分割して分析しやすいようにしましょう。
その上で処理してリストに格納するテキストはユーザーがSNSにつぶやくようにして、原文の口調を変えずにユーザーらしい口調を維持して抽出するようにお願いいたします。該当するトピックが何もない場合には空のリストを返してください。

## 悪い例

ラズパイ5買った => ["ラズパイ5買ったよ!"] # 語尾を勝手に変えてはいけません
満腹😊病院の時間までのんびり読書! => ["のんびり読書してる!"] # 語尾を勝手に変えてはいけません
スーパーのコーヒー豆もらった => ["スーパーのコーヒー豆もらって嬉しい!"] # 嬉しいはポストに記載がなく事実に基づいていないのでNGです

## 良い例

満腹😊病院の時間までのんびり読書! => ["病院の時間までのんびり読書!"] # 原文を維持しつつやっていたことを端的に伝えているのでよい
手元のMCPでとりあえず書籍の検索とタスク管理の機能、細々としたものはあるけどまた少しずつ機能を追加してObsidianやClineに統合していきたい。 => ["MCPの機能を少しずつ追加してObsidianやClineに統合していきたい"] # やっていることを端的に抽出して語尾や意味を維持しているので良い
"""

(1)どのような目的でテキストを整形する必要があるかを示し、(2)分析しやすいように複合的な話題を1つの話題となるように分割することが重要かと思います。また、悪い例・良い例を使用して出力内容を意図通りに制御できます。自身のポストの傾向に合わせて例示をカスタマイズすると前処理がしやすいかと思いますので、fetchの段階でlimitを50程度に絞るなどして少量から出力の品質を評価すると良いでしょう。

embedding

Embeddingでは前処理で作ったテキストをベクトル化する処理をします。ベクトル化はOllamaとOpenAIに対応しています。GPUを積んでいるPCであればチャットモデルよりも計算量が少ないので快適にかつ無料でベクトル化ができます。GPUがない場合やOllamaの設定が面倒な場合にはOpenAIのAPIを使っても良いかと思います。OpenAIのAPIはxAIと違って有料ですが、埋め込みモデルはチャットモデルよりも非常に安いです。text-embedding-3-smallであれば100万トークンの出力で0.02ドル程度になりますので多量のテキストを流しても費用としてはこどものおやつ代にもなかなか達しないと思います。コードは下記のとおりです。

from langchain_ollama import OllamaEmbeddings
from langchain_openai import OpenAIEmbeddings
from pandas import DataFrame
from pydantic import SecretStr
from tqdm import tqdm


class Embedding:
    def __init__(self, model: OllamaEmbeddings | OpenAIEmbeddings) -> None:
        self.model = model

    def _embed(self, docs: list[str]) -> list[list[float]]:
        return self.model.embed_documents(docs)

    def run(self, idf: DataFrame, batch_size: int = 1000) -> DataFrame:
        embeddings = []
        for i in tqdm(range(0, len(idf), batch_size)):
            topics = idf["topic"].tolist()[i : i + batch_size]
            embeds = self._embed(topics)
            embeddings.extend(embeds)
        df = DataFrame(
            [
                {
                    "index": idf.iloc[i]["index"],
                    "cid": idf.iloc[i]["cid"],
                    "embedding": e,
                }
                for i, e in enumerate(embeddings)
            ]
        )
        return df


def embed_by_openai(config: dict, api_key: str, idf: DataFrame) -> DataFrame:
    emb_model = OpenAIEmbeddings(
        model=config["embedding"]["openai_model"],
        api_key=SecretStr(api_key),
    )
    e = Embedding(emb_model)
    return e.run(idf)


def embed_by_ollama(config: dict, idf: DataFrame) -> DataFrame:
    emb_model = OllamaEmbeddings(
        model=config["embedding"]["ollama_model"],
        base_url=config["embedding"]["ollama_base_url"],
    )
    e = Embedding(emb_model)
    return e.run(idf)

clustering

クラスタリングではベクトル化したデータを活用してクラスタリングをします。config.tomlで分割するクラスタの数を事前に設定して、クラスタリングを行います。

[clustering]
n_clusters = 44

コードは下記のとおりです。

import logging

import pandas as pd
import numpy as np
from janome.tokenizer import Tokenizer
from umap import UMAP
from hdbscan import HDBSCAN
from sklearn.feature_extraction.text import CountVectorizer
from bertopic import BERTopic

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)

STOP_WORDS = [
    "の",
    "に",
    "は",
    "を",
    "た",
    "が",
    "で",
    "て",
    "と",
    "し",
    "れ",
    "さ",
    "ある",
    "いる",
    "も",
    "する",
    "から",
    "な",
    "こと",
    "として",
    "いく",
    "ない",
]
TOKENIZER = Tokenizer()


def _tokenize_japanese(text: str) -> list[str]:
    """
    日本語のテキストをJanomeでトークン化し、ストップワードを除外します。

    Args:
        text (str): トークン化する日本語のテキスト。

    Returns:
        list[str]: ストップワードが除外されたトークンのリスト。
    """
    return [
        token.surface
        for token in TOKENIZER.tokenize(text)
        if token.surface not in STOP_WORDS
    ]


def _perform_clustering(
    documents: list[str],
    embeddings: np.ndarray,
    metadata_dict: dict,
    min_cluster_size: int = 10,
    n_components: int = 2,
    num_topics: int = 6,
) -> pd.DataFrame:
    """
    与えられたドキュメントと埋め込みを使用して、UMAP、HDBSCAN、BERTopicを組み合わせた
    クラスタリングを実行します。クラスター数はユーザーが設定した固定値を使用します。

    Args:
        documents (list[str]): クラスタリング対象のドキュメントのリスト。
        embeddings (np.ndarray): ドキュメントに対応する埋め込みのNumPy配列。
        metadata_dict (dict): ドキュメントの追加メタデータを含む辞書。
                              結果DataFrameに結合されます。
        min_cluster_size (int, optional): HDBSCANの最小クラスターサイズ。デフォルトは2。
        n_components (int, optional): UMAPの次元削減後の次元数。デフォルトは2。
        num_topics (int, optional): BERTopicで生成するトピック(クラスター)の数。デフォルトは6。

    Returns:
        pd.DataFrame: クラスタリング結果を含むDataFrame。
                      各ドキュメントのインデックス、UMAP座標(x, y)、確率、クラスターIDが含まれます。

    Raises:
        Exception: クラスタリング処理中にエラーが発生した場合。
    """
    logger.info(f"Starting clustering with {len(documents)} documents")

    try:
        # UMAP次元削減モデルの初期化
        logger.info("Initializing UMAP dimensionality reduction model")
        umap_model = UMAP(
            random_state=42,
            n_components=n_components,
            n_jobs=-1,
        )

        # HDBSCANクラスタリングモデルの初期化
        logger.info("Initializing HDBSCAN clustering model")
        hdbscan_model = HDBSCAN(
            min_cluster_size=min_cluster_size,
            min_samples=1,
            core_dist_n_jobs=-1,
        )

        # CountVectorizerのセットアップ
        logger.info("Setting up CountVectorizer")
        vectorizer_model = CountVectorizer(tokenizer=_tokenize_japanese)

        # BERTopicモデルの初期化とクラスタ数の固定
        logger.info(f"Initializing BERTopic model with nr_topics={num_topics}")
        topic_model = BERTopic(
            umap_model=umap_model,
            hdbscan_model=hdbscan_model,
            vectorizer_model=vectorizer_model,
            verbose=True,
            nr_topics=num_topics,
        )

        # BERTopicモデルのフィットとトピック割り当ての取得
        logger.info("Fitting BERTopic model and getting topics")
        topics, probs = topic_model.fit_transform(documents, embeddings=embeddings)

        # UMAP座標を取得
        logger.info("Getting UMAP coordinates")
        umap_embeddings = topic_model.umap_model.transform(embeddings)

        # 結果DataFrameの生成
        logger.info("Generating result DataFrame")
        result_df = pd.DataFrame(
            {
                "index": metadata_dict["index"],
                "cid": metadata_dict["cid"],
                "x": umap_embeddings[:, 0],
                "y": umap_embeddings[:, 1],
                "probability": probs,
                "cluster-id": topics,
            }
        )

        logger.info(
            f"Clustering completed successfully with {len(set(result_df['cluster-id']))} clusters"
        )
        return result_df

    except Exception as e:
        logger.error(f"Error during clustering: {str(e)}", exc_info=True)
        raise


def clustering(config: dict, idf: pd.DataFrame, edf: pd.DataFrame) -> pd.DataFrame:
    """
    クラスタリングパイプラインのエントリポイント。
    設定、入力データフレーム、埋め込みデータフレームを受け取り、クラスタリング結果を返します。

    Args:
        config (dict): クラスタリング設定を含む辞書。
                       `config["clustering"]["n_clusters"]` でクラスター数を指定します。
        idf (pd.DataFrame): トピックとメタデータを含む入力DataFrame。
                                 "topic", "index", "cid" カラムが必要です。
        edf (pd.DataFrame): 埋め込みデータを含むDataFrame。
                                     "embedding" カラムが必要です。

    Returns:
        pd.DataFrame: クラスタリング結果を含むDataFrame。
                      各ドキュメントのインデックス、UMAP座標(x, y)、確率、クラスターIDが含まれます。

    Raises:
        Exception: クラスタリングパイプライン中にエラーが発生した場合。
    """
    logger.info("Starting clustering pipeline")
    try:
        logger.info("Loading topic data")
        documents_to_cluster = idf["topic"].values.tolist()
        document_embeddings = np.asarray(edf["embedding"].values.tolist())

        num_clusters = int(config["clustering"]["n_clusters"])
        logger.info(f"Configured for {num_clusters} clusters")

        # クラスタリングの実行
        clustering_result = _perform_clustering(
            documents=documents_to_cluster,
            embeddings=document_embeddings,
            metadata_dict={
                "index": idf["index"].values,
                "cid": idf["cid"].values,
            },
            min_cluster_size=10,
            num_topics=num_clusters,
        )
        logger.info("Clustering pipeline completed successfully")
        return clustering_result
    except Exception as e:
        logger.error(f"Error in clustering pipeline: {str(e)}", exc_info=True)
        raise

UMAPでベクトル化された情報の次元数を二次元に削減して散布図で表示できるようにします。そのうえで、HBDSCANでクラスタの密度を計算し、CountVectorizerで意味の近さを図る上で重要度の低いワード(ストップワード)を取り除くようにします。BERTopicにUMAPとHBDSCANとCountVectorizerをわたしてモデルの初期化をします。こうして作ったモデルにテキストとベクトル情報を渡すことによりX,Y座標や所属するクラスタ番号、そのクラスタの話題である可能性を0.0~1.0示したデータを返します。

labeling

Labelingでは作ったクラスタから確度が高いテキストをランダムで最大16個取り出して簡単にクラスタに名前をつける処理です。

import random

import pandas as pd


from langchain.prompts import ChatPromptTemplate
from langchain_xai import ChatXAI
from pydantic import BaseModel, Field, SecretStr
from tqdm import tqdm


class LabelData(BaseModel):
    label: str = Field(..., description="テキストから生成したラベルを格納する。")


def _get_label(config: dict, api_key: str, topics: list[str]) -> str:
    llm = ChatXAI(
        model=config["labeling"]["model"],
        api_key=SecretStr(api_key),
        temperature=0.0,
    )
    topic_data = "\n".join(topics).replace("{", "{{").replace("}", "}}")
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                config["labeling"]["system_prompt"],
            ),
            (
                "human",
                topic_data,
            ),
        ]
    )
    chain = prompt | llm.with_structured_output(LabelData)
    res: LabelData = chain.invoke({})  # type: ignore
    return res.label


def labeling(
    config: dict, api_key: str, idf: pd.DataFrame, threshold: float
) -> pd.DataFrame:
    """
    クラスタリングされた各グループに対して、その特徴をもっともよく表すラベルを生成します。

    Args:
        config (dict): 設定情報を含む辞書。
        api_key (str): XAI APIのキー。
        idf (pd.DataFrame): クラスタリングされたデータを含むDataFrame。
        threshold (float): ラベル生成に使用するデータの確率閾値。
                           この値以上の確率を持つデータのみがラベル生成に使用されます。

    Returns:
        pd.DataFrame: 各クラスタIDと対応するラベルを含むDataFrame。
    """
    data: list[dict[str, str]] = []
    cluster_ids = sorted(list(idf["cluster-id"].unique()))
    for cluster_id in tqdm(cluster_ids):
        cdf = idf[(idf["cluster-id"] == cluster_id) & (idf["probability"] >= threshold)]
        topics = cdf["topic"].tolist()
        random.shuffle(topics)
        topic_samples = [f'"""\n{t.strip()}\n"""' for t in topics[:16]]
        label = _get_label(config, api_key, topic_samples)
        data.append({"cluster-id": cluster_id, "label": label})
    return pd.DataFrame(data)

外れ値となるテキストは外した状態でラベルを作るのでクラスタの特徴を強めに押し出したラベル名をつけられます。体言止めにして簡潔なラベルにしてほしいとリクエストするとわかりやすいラベル名をつけてもらえるのでおすすめです。プロンプトは下記のとおりです。

model="grok-3-beta"
system_prompt = """
クラスタ分析の結果に従ってユーザーがテキストを提供します。クラスタに特徴を示すラベルを生成してください。
自明な文脈は含まず完結でかつ体言止めの形式でわかりやすく非常に簡潔なラベル付けが必要です。ラベル名は必ず日本語で出力してください。
"""

chart

前処理したテキスト、ラベル、クラスタリングのIDや座標を使うことで散布図に分析結果を落とし込めます。PlotlyのDashを使うとWebでユーザーが操作しながら内容を見られるグラフを作ることができ、Webサイトにも掲載しやすいので便利かと思います。

from typing import Literal
import plotly.express as px
import pandas as pd
import numpy as np


def _create_scatter_plot(
    config: dict,
    pdf: pd.DataFrame,
    ldf: pd.DataFrame,
    pallet_type: Literal[
        "high_contrast", "colorblind_friendly", "vivid"
    ] = "colorblind_friendly",
):
    df = pd.merge(pdf, ldf, on="cluster-id", how="left")

    # クラスターIDを文字列に変換(重要:これがないとカラーパレットが正しく適用されない)
    df["cluster-id"] = df["cluster-id"].astype(str)

    # より視認性の良いカラーパレットを使用
    color_palette_options = {
        # オプション1: Plotlyの標準的な高コントラストパレット
        "high_contrast": px.colors.qualitative.Plotly,
        # オプション2: 色覚多様性に配慮したパレット
        "colorblind_friendly": [
            "#1f77b4",
            "#ff7f0e",
            "#2ca02c",
            "#d62728",
            "#9467bd",
            "#8c564b",
            "#e377c2",
            "#7f7f7f",
            "#bcbd22",
            "#17becf",
            "#aec7e8",
            "#ffbb78",
            "#98df8a",
            "#ff9896",
            "#c5b0d5",
            "#c49c94",
            "#f7b6d3",
            "#c7c7c7",
            "#dbdb8d",
            "#9edae5",
            "#393b79",
            "#637939",
            "#8c6d31",
            "#843c39",
            "#7b4173",
            "#5254a3",
            "#8ca252",
            "#bd9e39",
            "#ad494a",
            "#a55194",
        ],
        # オプション3: より鮮明で区別しやすいカスタムパレット
        "vivid": [
            "#FF0000",
            "#00FF00",
            "#0000FF",
            "#FFFF00",
            "#FF00FF",
            "#00FFFF",
            "#800080",
            "#FFA500",
            "#008000",
            "#FF69B4",
            "#4169E1",
            "#DC143C",
            "#32CD32",
            "#FFD700",
            "#9932CC",
            "#FF4500",
            "#2E8B57",
            "#FF1493",
            "#1E90FF",
            "#228B22",
            "#B22222",
            "#4682B4",
            "#D2691E",
            "#9370DB",
            "#20B2AA",
            "#F4A460",
            "#8B008B",
            "#556B2F",
            "#CD5C5C",
            "#40E0D0",
        ],
    }
    selected_palette = color_palette_options[pallet_type]

    # クラスターの数を確認し、必要に応じてパレットを拡張
    unique_clusters = df["cluster-id"].unique()
    num_clusters = len(unique_clusters)

    if num_clusters > len(selected_palette):
        # パレットが足りない場合は繰り返し使用
        extended_palette = (
            selected_palette * ((num_clusters // len(selected_palette)) + 1)
        )[:num_clusters]
        selected_palette = extended_palette

    fig = px.scatter(
        df,
        x="x",
        y="y",
        color="cluster-id",
        color_discrete_sequence=selected_palette,
        hover_data=["topic", "cluster-id", "label"],
        title=config["chart"]["title"],
        opacity=0.8,
        size_max=12,
        # カテゴリの順序を明示的に指定
        category_orders={
            "cluster-id": sorted(
                unique_clusters, key=lambda x: int(x) if x != "-1" else -1
            )
        },
    )

    # マーカーの境界線を追加して区別しやすくする
    fig.update_traces(
        marker=dict(
            line=dict(width=0.8, color="white"),  # 境界線を少し太くして視認性向上
            size=7,  # マーカーサイズを少し大きく
        )
    )

    # cluster-idを数値に戻してグループ化処理
    df["cluster-id-numeric"] = df["cluster-id"].astype(int)

    cluster_centers = (
        df.groupby("cluster-id-numeric")
        .agg(
            x=("x", "mean"),
            y=("y", "mean"),
            label=(
                "label",
                lambda x: x.mode()[0] if not x.mode().empty else "",
            ),
            count=("cluster-id-numeric", "count"),  # クラスターのサイズも取得
        )
        .reset_index()
    )

    # クラスターサイズに基づいてラベル表示を選択的に行う
    cluster_centers = cluster_centers.sort_values("count", ascending=False)

    # 動的にラベルサイズとyshiftを調整
    max_clusters_to_label = min(20, len(cluster_centers))

    for i, row in cluster_centers.head(max_clusters_to_label).iterrows():
        if row["cluster-id-numeric"] == -1:
            continue

        # ラベルテキストをより短く調整
        label_text = row["label"]
        if len(label_text) > 12:
            label_text = label_text[:9] + "..."

        cluster_label = f"C{row['cluster-id-numeric']}: {label_text}"

        # クラスターサイズに応じてフォントサイズを調整
        font_size = max(9, min(12, 8 + row["count"] // 10))

        # 重複を避けるためにランダムなオフセットを追加
        np.random.seed(int(row["cluster-id-numeric"]))  # 再現性のためにシード固定
        x_offset = np.random.uniform(-0.1, 0.1)
        y_offset = np.random.uniform(0.1, 0.3)

        fig.add_annotation(
            x=row["x"] + x_offset,
            y=row["y"] + y_offset,
            text=cluster_label,
            showarrow=True,
            arrowhead=2,
            arrowsize=1,
            arrowwidth=1,
            arrowcolor="rgba(0, 0, 0, 0.6)",
            ax=0,
            ay=-20,
            font=dict(
                size=font_size,
                color="black",
                family="IPAexGothic, Noto Sans CJK JP, sans-serif",
                weight="bold",
            ),
            bgcolor="rgba(255, 255, 255, 0.95)",
            bordercolor="rgba(0, 0, 0, 0.5)",
            borderwidth=1.5,
            xanchor="center",
            yanchor="top",
        )

    # 残りの小さなクラスターには番号のみ表示
    for i, row in cluster_centers.tail(
        len(cluster_centers) - max_clusters_to_label
    ).iterrows():
        if row["cluster-id-numeric"] == -1:
            continue

        fig.add_annotation(
            x=row["x"],
            y=row["y"],
            text=f"C{row['cluster-id-numeric']}",
            showarrow=False,
            font=dict(
                size=8,
                color="white",
                family="IPAexGothic, Noto Sans CJK JP, sans-serif",
                weight="bold",
            ),
            bgcolor="rgba(0, 0, 0, 0.8)",
            bordercolor="rgba(255, 255, 255, 0.9)",
            borderwidth=1.5,
            xanchor="center",
            yanchor="middle",
        )

    fig.update_layout(
        font=dict(family="IPAexGothic, Noto Sans CJK JP, sans-serif", size=12),
        plot_bgcolor="white",
        paper_bgcolor="white",
        width=1080,
        height=610,
        legend=dict(
            orientation="v",
            yanchor="top",
            y=1,
            xanchor="left",
            x=1.01,
            bgcolor="rgba(255, 255, 255, 0.95)",
            bordercolor="rgba(0, 0, 0, 0.4)",
            borderwidth=1.5,
            itemsizing="constant",
            font=dict(size=10, weight="bold"),
            # 凡例のタイトルを追加
            title=dict(text="Cluster ID", font=dict(size=12, weight="bold")),
        ),
        margin=dict(l=60, r=200, t=100, b=60),
        title=dict(font=dict(size=16, weight="bold"), x=0.5, xanchor="center"),
    )

    # 軸のグリッドを削除
    fig.update_xaxes(showgrid=False)
    fig.update_yaxes(showgrid=False)

    # ズーム機能を有効にして詳細確認を容易に
    fig.update_layout(xaxis=dict(fixedrange=False), yaxis=dict(fixedrange=False))

    return fig


def chart(
    config: dict,
    pdf: pd.DataFrame,
    ldf: pd.DataFrame,
    full_html: bool,
    pallet_type: Literal[
        "colorblind_friendly", "vivid", "high_contrast"
    ] = "colorblind_friendly",
) -> str:
    fig = _create_scatter_plot(config, pdf, ldf, pallet_type)
    return fig.to_html(full_html=full_html)

カラーパレットやレイアウトを編集すると見やすいグラフにもできるかと思いますので目的や見せ方にあわせて調整をいただくと良いかと思います。

all

初回の処理や定期的な処理でデータ抽出からグラフ作成まですべて処理したい場合にallを使うと良いです。クラスタ数の調整など細かい手直しがいるときに個別の処理を使うとよいかと思います。

分析結果

Plotlyの機能でページとして出力する場合にはfull_htmlをTrueにし、ページ内に組み込む目的であればfull_htmlをFalseにすると分析結果をファイルで共有したり、HTMLに組み込んでWeb上に公開したりできます。このサイトの場合だとAstroを使用しているのでmdxに本記事を書いて、結果のHTMLを別途コンポーネントとして保管し、importさせて表示しています。

ご覧の通り2023年7月から2025年5月末までいろいろなことをやっていてそれとなくクラスタわけされている様子が見えます。個別の点にカーソルを合わせると実際にやったことが見ることができ、個別にどのようなことをしてきたかを見たり、個別に見る中でやっていることのつながりを判断したりしやすくなります。ブロードリスニング自体は無数の声を切り分けて分類し、個別に見たり全体でみたりできる手法なので東京都の事例のように意見やアンケートを分析する以外にも、Blueskyの自身のポストの分析などにも流用ができます。ただ、目的や方向性を持ってデータの前処理や分割をすることが大事で、そのためにも事前に用意されているデータの傾向や方向性、テーマなども把握しておくことも欠かせないと思います。

まとめ

身近にあるテキストを自動で取得し、可視化するまでの一連の流れを確認しました。ブロードリスニングはテキストの準備や生成AIの活用などで難しそうに見えますが、人間が手作業で処理すると非常に手間がかかる手順をAIで楽に判断・分類してもらえることが体感できるかと思います。また、Blueskyなど自身で所有するテキストであれば再公開も含めて柔軟に対応もできます。 自身のSNSのポストやメモなどであれば簡単に気兼ねなく分析や再公開ができますし、公私ともに分析と改善をわかりやすく行えて使い勝手のいい技術・分析手法ですのでぜひブロードリスニングの手法を自身の所有するテキストにあわせて適用いただけると良いのかと思います。

関連記事