Scriptone Scriptone

자신의 Bluesky 게시물에서 브로드 리스닝하기 (Python)

개요

본 문서에서는 Bluesky에 직접 게시한 텍스트 데이터를 활용하여 Python, 생성형 AI, 그리고 클러스터링 기술을 사용하여 “브로드 리스닝”을 수행하는 방법을 설명합니다. 브로드 리스닝이란 수많은 목소리와 다양한 의견을 효율적으로 집계하고 정리하여 전체적인 맥락과 경향을 파악하는 기법입니다. 개인적인 용도에 초점을 맞춘 본 문서에서는 저작권 및 이용 약관을 준수하면서 본인의 게시물 데이터만을 분석 대상으로 삼습니다.

브로드 리스닝이란?

브로드 리스닝은 많은 사람들의 다양한 목소리를 빠짐없이 듣기 위한 일련의 기법을 총칭합니다. 과거에는 기업, 정치인, 연예인, 인플루언서 등이 미디어와 SNS를 통해 대중에게 정보를 일방적으로 브로드캐스트했습니다. 반대로 대중의 목소리를 취합하고 정리하여 효율적으로 볼 수 있는 방법이 부족하여, 사람이 직접 정리하고 보고서로 작성해야 했습니다. 생성형 AI의 등장으로 인해 방대한 텍스트를 분석하기 쉽게 전처리하고, 분류하며, 분류된 텍스트에 라벨을 붙이고, 요약하는 일련의 절차가 효율적으로 가능해졌습니다. 이를 통해 방대한 텍스트에서 전체적인 맥락과 세부적인 내용을 모두 파악할 수 있게 되었습니다.

일본 국내에서는 도쿄도에서 “2050년대 도쿄는 어떤 모습이 되기를 바라는가?”라는 주제로 브로드 리스닝 분석 결과와 코드, 프롬프트가 공개되었습니다. 대중으로부터 다양한 목소리가 모여 전체를 조감하는 모습에 대해서는 도쿄도의 사례를 참고하시면 좋을 것입니다.

이번 분석 내용

본 문서에서는 Bluesky에 직접 게시한 텍스트를 사용하여 과거에 어떤 활동을 해왔는지 관점에서 전처리 및 클러스터링을 수행합니다. 업무 등에서는 1차 데이터나 2차 데이터 등 설문조사 데이터를 사용하여 특정 목적에 따라 분석을 수행할 수도 있습니다. 그러나 개인이 동일하게 시도할 경우 저작권 문제로 텍스트를 다루기 어렵거나 재공개가 불가능한 문제가 발생할 수 있습니다. 하지만 본인이 직접 게시한 Bluesky 게시물은 저작권 문제없이 가공하여 재공개하는 데 문제가 없습니다. 다만, 이는 2025년 5월 말 기준 정보이므로, 실제로 Bluesky 게시물을 사용하여 브로드 리스닝을 수행할 경우에는 Bluesky의 이용 약관을 반드시 확인해 주시기 바랍니다.

저장소 (Repository)

저장소는 GitHub에서 관리됩니다. README.md에 따라 클론하고 라이브러리 등을 설치해 주세요. 클러스터링용 라이브러리를 위해 별도의 환경 구축이 필요할 수 있으므로, uv sync로 라이브러리 설치에 실패할 경우 Perplexity 등에서 환경 구축 방법을 찾아보시기 바랍니다. 아마도 틈새 정보는 아닐 것이므로 환경 구축에 도움이 될 것입니다. URL: https://github.com/rmc8/bsky_listening

코드 설명

브로드 리스닝 프로그램은 Google의 fire 라이브러리를 사용하여 CLI로 실행되도록 구성되어 있습니다. 실행 방법은 저장소의 README.md에 설명되어 있지만, uv run main.py all과 같이 명령을 설정하여 실행합니다. xAI나 Bluesky 등 외부 API를 사용하므로, API 키나 앱 비밀번호 등을 미리 .evn 또는 PC 환경 변수에 설정해야 합니다. CLI로 실행되는 처리의 대략적인 흐름은 다음과 같습니다.

  • main.py: 아래 기능을 CLI로 조작하는 기능을 제공합니다.
    • fetch: Bluesky에서 본인의 게시물을 가져옵니다.
    • preproc: 게시물을 브로드 리스닝에 적합한 형태로 정돈하고, 여러 토픽이 포함된 경우 하나씩 분할합니다.
    • embedding: 전처리된 텍스트를 벡터화합니다 (Ollama 및 OpenAI의 임의 모델 사용 가능).
    • clustering: 벡터화된 데이터를 클러스터링하여 X, Y 좌표와 클러스터 번호를 표시합니다.
    • labeling: 각 클러스터의 특징을 나타내는 간단한 라벨을 생성합니다.
    • chart: preproc, clustering, labeling 데이터를 사용하여 산점도를 생성합니다.
    • all: 위 fetch부터 chart까지의 일련의 모든 처리를 실행합니다.

main.py


main.py는 브로드 리스닝에 필요한 각 단계를 CLI로 조작할 수 있도록 구성되어 있습니다. 모든 처리 내용은 BskyListening 클래스에 통합되어 있으며, CLI를 통해 메서드를 실행하여 각 단계를 수행할 수 있습니다. 설정 파일은 TOML 형식으로 작성하고, API 키와 같은 민감한 값은 .env 파일이나 환경 변수에서 읽어오도록 합니다.

import os
import tomllib
from datetime import datetime
from pathlib import Path
from typing import Any

import fire
import pandas as pd
from dotenv import load_dotenv


from libs import bsky, chart, preproc, embedding, clustering, labeling
from libs.file_name import FileName

load_dotenv()

now = datetime.now()
pj_name = f"{now:%Y%m%d}"


THIS_DIR = Path(__file__).parent
PROJECT_DIR = THIS_DIR / "project"


def get_config() -> dict[str, Any]:
    config_file = THIS_DIR / "config.toml"
    with config_file.open("rb") as b:
        return tomllib.load(b)


class BskyListening:
    """
    Bluesky 소셜 미디어 플랫폼에서 특정 사용자의 게시물을 가져와,
    그 내용을 분석하고 시각화하기 위한 일련의 처리를 제공하는 클래스입니다.
    """

    @staticmethod
    def fetch(pj_name: str = pj_name, limit: int = 500):
        """
        지정된 Bluesky 계정에서 게시물 데이터를 가져와 TSV 형식으로 저장합니다.
        리포스트는 제외됩니다.

        Args:
            pj_name (str): 프로젝트 이름. 기본값은 현재 날짜 (YYYYMMDD).
            limit (int): 가져올 게시물의 최대 개수. 기본값은 500.
        """
        df = bsky.fetch(
            config=get_config(), app_pass=str(os.getenv("BSKY_APP_PASS")), limit=limit
        )
        io_dir = PROJECT_DIR.joinpath(pj_name)
        os.makedirs(io_dir, exist_ok=True)
        output_path = io_dir.joinpath(FileName.bsky_posts.value)
        df.to_csv(output_path, sep="\t", index=False)

    @staticmethod
    def preproc(pj_name: str = pj_name):
        """
        가져온 게시물 텍스트에서 주요 토픽을 추출하고, 분석하기 쉬운 형태로 정돈합니다.

        Args:
            pj_name (str): 프로젝트 이름. 기본값은 현재 날짜 (YYYYMMDD).
        """
        io_dir = PROJECT_DIR.joinpath(pj_name)
        input_path = io_dir.joinpath(FileName.bsky_posts.value)
        idf = pd.read_csv(input_path, sep="\t")
        odf = preproc.preproc(
            config=get_config(),
            api_key=str(os.getenv("XAI_API_KEY")),
            idf=idf,
        )
        output_path = io_dir.joinpath(FileName.preproc.value)
        odf.to_csv(output_path, sep="\t", index=False)

    @staticmethod
    def _embedding_by_openai(idf: pd.DataFrame) -> pd.DataFrame:
        """
        OpenAI 모델을 사용하여 임베딩을 생성합니다.

        Args:
            idf (pd.DataFrame): 전처리된 토픽 데이터를 포함하는 DataFrame.

        Returns:
            pd.DataFrame: 임베딩 데이터를 포함하는 DataFrame.
        """
        return embedding.embed_by_openai(
            config=get_config(),
            api_key=str(os.getenv("OPENAI_API_KEY")),
            idf=idf,
        )

    @staticmethod
    def _embedding_by_ollama(idf: pd.DataFrame) -> pd.DataFrame:
        """
        Ollama 모델을 사용하여 임베딩을 생성합니다.

        Args:
            idf (pd.DataFrame): 전처리된 토픽 데이터를 포함하는 DataFrame.

        Returns:
            pd.DataFrame: 임베딩 데이터를 포함하는 DataFrame.
        """
        return embedding.embed_by_ollama(
            config=get_config(),
            idf=idf,
        )

    @classmethod
    def embedding(cls, pj_name: str = pj_name, is_local: bool = True):
        """
        전처리된 토픽 데이터에서 기계 학습 모델(OpenAI 또는 Ollama)을 사용하여
        수치 벡터(임베딩)를 생성합니다.

        Args:
            pj_name (str): 프로젝트 이름. 기본값은 현재 날짜 (YYYYMMDD).
            is_local (bool): 로컬 Ollama 모델을 사용할지 여부. 기본값은 True.
                                False인 경우 OpenAI 모델을 사용합니다.
        """
        io_dir = PROJECT_DIR.joinpath(pj_name)
        input_path = io_dir.joinpath(FileName.preproc.value)
        idf = pd.read_csv(input_path, sep="\t")
        embedding_func = (
            cls._embedding_by_ollama if is_local else cls._embedding_by_openai
        )
        odf = embedding_func(idf)
        output_path = io_dir.joinpath(FileName.embedding.value)
        odf.to_pickle(output_path)

    @staticmethod
    def clustering(pj_name: str = pj_name):
        """
        생성된 임베딩 데이터를 사용하여 게시물을 유사성에 따라 클러스터링합니다.
        UMAP, HDBSCAN, BERTopic, Spectral Clustering을 조합하여 사용합니다.

        Args:
            pj_name (str): 프로젝트 이름. 기본값은 현재 날짜 (YYYYMMDD).
        """
        io_dir = PROJECT_DIR.joinpath(pj_name)
        preproc_path = io_dir.joinpath(FileName.preproc.value)
        embedding_path = io_dir.joinpath(FileName.embedding.value)
        idf = pd.read_csv(preproc_path, sep="\t")
        edf = pd.read_pickle(embedding_path)
        odf = clustering.clustering(
            config=get_config(),
            idf=idf,
            edf=edf,
        )
        output_path = io_dir.joinpath(FileName.clustering.value)
        odf.to_csv(output_path, sep="\t", index=False)

    @staticmethod
    def labeling(pj_name: str = pj_name, threshold: float = 0.70):
        """
        클러스터링된 각 그룹에 대해 그 특징을 가장 잘 나타내는 라벨을 생성합니다.

        Args:
            pj_name (str): 프로젝트 이름. 기본값은 현재 날짜 (YYYYMMDD).
            threshold (float): 라벨 생성에 사용할 데이터의 확률 임계값. 기본값은 0.70.
                                이 값 이상의 확률을 가진 데이터만 라벨 생성에 사용됩니다.
        """
        io_dir = PROJECT_DIR.joinpath(pj_name)
        clustering_path = io_dir.joinpath(FileName.clustering.value)
        preproc_path = io_dir.joinpath(FileName.preproc.value)
        pdf = pd.read_csv(preproc_path, sep="\t")
        cdf = pd.read_csv(clustering_path, sep="\t")
        idf = pd.merge(pdf, cdf, on=["index"], how="left")
        odf = labeling.labeling(
            config=get_config(),
            api_key=str(os.getenv("XAI_API_KEY")),
            idf=idf,
            threshold=threshold,
        )
        output_path = io_dir.joinpath(FileName.labeling.value)
        odf.to_csv(output_path, sep="\t", index=False)

    @staticmethod
    def chart(pj_name: str = pj_name, full_html: bool = True):
        """
        클러스터링 및 라벨링 결과를 기반으로 대화형 산점도(HTML 형식)를 생성합니다.

        Args:
            pj_name (str): 프로젝트 이름. 기본값은 현재 날짜 (YYYYMMDD).
            full_html (bool): 차트를 완전한 HTML로 출력할지 여부. 기본값은 True.
        """
        io_dir = PROJECT_DIR.joinpath(pj_name)
        clustering_path = io_dir.joinpath(FileName.clustering.value)
        preproc_path = io_dir.joinpath(FileName.preproc.value)
        pdf = pd.read_csv(preproc_path, sep="\t")
        cdf = pd.read_csv(clustering_path, sep="\t")
        idf = pd.merge(pdf, cdf, on=["index"], how="left")
        labeling_path = io_dir.joinpath(FileName.labeling.value)
        ldf = pd.read_csv(labeling_path, sep="\t")
        html = chart.chart(config=get_config(), pdf=idf, ldf=ldf, full_html=full_html)
        output_path = io_dir.joinpath(FileName.chart.value)
        with open(output_path, "w") as f:
            f.write(html)

    @classmethod
    def all(
        cls,
        pj_name: str = pj_name,
        limit: int = 500,
        is_local: bool = True,
        threshold: float = 0.70,
        full_html: bool = True,
    ):
        """
        Bluesky 게시물 가져오기부터 차트 생성까지의 일련의 모든 처리를 실행합니다.

        Args:
            pj_name (str): 프로젝트 이름. 기본값은 현재 날짜 (YYYYMMDD).
            limit (int): 가져올 게시물의 최대 개수. 기본값은 500.
            is_local (bool): 임베딩 생성 시 로컬 Ollama 모델을 사용할지 여부.
                                기본값은 True. False인 경우 OpenAI 모델을 사용합니다.
            threshold (float): 라벨 생성에 사용할 데이터의 확률 임계값. 기본값은 0.70.
                                이 값 이상의 확률을 가진 데이터만 라벨 생성에 사용됩니다.
            full_html (bool): 차트를 완전한 HTML로 출력할지 여부. 기본값은 True.
        """
        cls.fetch(pj_name=pj_name, limit=limit)
        cls.preproc(pj_name=pj_name)
        cls.embedding(pj_name=pj_name, is_local=is_local)
        cls.clustering(pj_name=pj_name)
        cls.labeling(pj_name=pj_name, threshold=threshold)
        cls.chart(pj_name=pj_name, full_html=full_html)


def main():
    fire.Fire(BskyListening)


if __name__ == "__main__":
    main()

fetch

fetch 함수는 분석 대상인 Bluesky 게시물을 가져옵니다. toml 파일에 Bluesky 계정의 핸들을 입력하고 앱 비밀번호를 환경 변수에 설정하면, 지정된 핸들의 게시물을 limit에 도달하거나 모든 게시물을 가져올 때까지 계속해서 가져옵니다. 가져온 데이터는 uri, cid, text, created_at의 4개 열로 구성되어 출력됩니다.

import logging
from time import sleep
from typing import Any


from atproto import Client
from pandas import DataFrame
from retry import retry


logger = logging.getLogger(__name__)


@retry(tries=5, delay=6.0)
def _get_timeline(c: Client, actor: str, cursor: str | None):
    res = c.get_author_feed(actor=actor, cursor=cursor, limit=100)
    return res


def fetch(config: dict[str, Any], app_pass: str, limit: int) -> DataFrame:
    logging.basicConfig(level=logging.INFO)
    handle = config["bluesky"]["handle"]
    c = Client()
    c.login(login=handle, password=app_pass)
    posts: list[dict[str, Any]] = []
    cursor: str | None = None
    while True:
        res = _get_timeline(c, handle, cursor)
        for feed_view in res.feed:
            post = feed_view.post
            if post.viewer is not None and post.viewer.repost is not None:
                continue
            posts.append(
                {
                    "uri": post.uri,
                    "cid": post.cid,
                    "text": post.record.text,
                    "created_at": post.record.created_at,
                }
            )
        cursor = post.record.created_at
        if res.feed:
            cursor = res.feed[-1].post.record.created_at
            logger.info("cursor: %s, len: %s", cursor, len(posts))
            continue
        if res.cursor is None or len(posts) >= limit or len(res.feed) == 0:
            break
        sleep(1.0)
    return DataFrame(posts)

이 처리가 가장 많은 비용과 시간이 소요될 것으로 예상됩니다. 필요에 따라 저렴하거나 빠른 모델을 사용하거나, 도쿄도 사례처럼 여러 스레드(2~4개 정도)를 사용하여 동시에 처리하면 비용 절감 및 속도 향상을 꾀할 수 있습니다.

preproc

분석을 위해 가져온 게시물을 정리합니다. 게시물을 하나씩 읽어 들이고, 분석 목적에 맞게 텍스트를 가공합니다.

import logging
import pandas as pd


from langchain.prompts import ChatPromptTemplate
from langchain_xai import ChatXAI
from pydantic import BaseModel, Field, SecretStr
from tqdm import tqdm


class TopicData(BaseModel):
    topics: list[str] = Field(
        ...,
        description="사용자의 게시물에서 추출된 주요 활동/행동 토픽 목록. 여러 행동이 포함된 경우 분할됩니다.",
    )


def _get_topics(config: dict, api_key: str, post: str):
    llm = ChatXAI(
        model=config["preproc"]["model"],
        api_key=SecretStr(api_key),
        temperature=0.0,
    )
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                config["preproc"]["system_prompt"],
            ),
            (
                "human",
                post.replace("{", "{{").replace("}", "}}"),
            ),
        ]
    )
    chain = prompt | llm.with_structured_output(TopicData)
    res: TopicData = chain.invoke({})  # type: ignore
    return res.topics


def preproc(config: dict, api_key: str, idf: pd.DataFrame):
    logging.basicConfig(level=logging.WARNING)
    data = []
    index = 0
    idf = idf.dropna(subset=["text"])
    for rec in tqdm(idf.to_dict("records")):
        topics = _get_topics(config, api_key, rec["text"])
        for topic in topics:
            data.append(
                {
                    "index": index,
                    "cid": rec["cid"],
                    "topic": topic,
                }
            )
            index += 1
    return pd.DataFrame(data)

분석에는 xAI 모델을 사용합니다. xAI는 학습 데이터로 채팅을 제공하면 매월 150달러의 무료 크레딧을 제공하므로, 일단 브로드 리스닝을 시도해 볼 목적이라면 비용 걱정 없이 사용해 볼 수 있습니다. 제공하지 않도록 선택하면 자비로 이용해야 하지만, 학습에 사용되지 않고 API를 활용할 수 있습니다.

모델에 전달되는 시스템 프롬프트는 다음과 같습니다.

[preproc]
model = "grok-3-fast-beta"
system_prompt = """
당신은 데이터 분석가입니다. Bluesky 사용자로부터 게시물을 받아 브로드 리스닝하기 쉽도록 텍스트를 가공하는 역할을 맡습니다.
이번에는 사용자가 어떤 활동을 했는지에 초점을 맞춰 토픽을 추출하고자 합니다. 여러 활동이 포함된 경우에는 각 활동을 하나씩 분할하여 분석하기 쉽게 만드세요.
또한, 처리하여 목록에 저장하는 텍스트는 사용자가 SNS에 쓰는 것처럼 원문의 어조를 바꾸지 않고 사용자다운 어조를 유지하여 추출해 주시기 바랍니다. 해당 토픽이 아무것도 없는 경우에는 빈 목록을 반환하세요.

## 나쁜 예

라즈베리 파이 5 샀다 => ["라즈베리 파이 5 샀어요!"] # 어미를 마음대로 바꾸면 안 됩니다
배불러😊 병원 시간까지 느긋하게 독서! => ["느긋하게 독서하고 있어!"] # 어미를 마음대로 바꾸면 안 됩니다
슈퍼에서 커피 원두 받았다 => ["슈퍼에서 커피 원두 받아서 기뻐요!"] # '기뻐요'는 게시물에 없으므로 사실에 근거하지 않아 안 됩니다

## 좋은 예

배불러😊 병원 시간까지 느긋하게 독서! => ["병원 시간까지 느긋하게 독서!"] # 원문을 유지하면서 핵심 활동을 간결하게 전달하므로 좋습니다
손에 든 MCP로 일단 책 검색 및 작업 관리 기능 등 소소한 것들이 있지만, 조금씩 기능을 추가하여 Obsidian이나 Cline에 통합하고 싶다. => ["MCP 기능을 조금씩 추가하여 Obsidian이나 Cline에 통합하고 싶다"] # 핵심 활동을 간결하게 추출하고 어미와 의미를 유지하므로 좋습니다
"""

분석 시에는 (1) 텍스트를 어떤 목적으로 정리해야 하는지 명확히 하고, (2) 복합적인 내용을 하나의 토픽으로 분할하는 것이 중요합니다. 또한, 나쁜 예좋은 예를 활용하여 출력 내용을 의도대로 제어할 수 있습니다. 본인의 게시물 경향에 맞춰 예시를 맞춤 설정하면 전처리가 용이할 것입니다. 따라서, fetch 단계에서 limit을 약 50개 정도로 제한하여 소량의 데이터로 출력 품질을 평가하는 것이 좋습니다.

embedding

Embedding 과정에서는 전처리된 텍스트를 벡터로 변환합니다. 이 벡터화는 OllamaOpenAI를 모두 지원합니다. GPU가 장착된 PC를 사용한다면, Ollama를 통해 챗 모델보다 계산량이 적기 때문에 쾌적하고 무료로 벡터화를 수행할 수 있습니다. GPU가 없거나 Ollama 설정이 번거롭다면 OpenAI API를 사용하는 것도 좋은 선택입니다. OpenAI API는 xAI와 달리 유료이지만, 임베딩 모델은 챗 모델보다 훨씬 저렴합니다. 특히 text-embedding-3-small 모델의 경우 100만 토큰당 약 0.02달러 정도로, 많은 양의 텍스트를 처리하더라도 비용 부담이 적을 겁니다.


코드 설명

from langchain_ollama import OllamaEmbeddings
from langchain_openai import OpenAIEmbeddings
from pandas import DataFrame
from pydantic import SecretStr
from tqdm import tqdm


class Embedding:
    def __init__(self, model: OllamaEmbeddings | OpenAIEmbeddings) -> None:
        self.model = model

    def _embed(self, docs: list[str]) -> list[list[float]]:
        # 선택된 모델을 사용하여 문서 목록을 임베딩합니다.
        return self.model.embed_documents(docs)

    def run(self, idf: DataFrame, batch_size: int = 1000) -> DataFrame:
        embeddings = []
        # DataFrame을 배치 단위로 나누어 처리하여 메모리 효율성을 높입니다.
        for i in tqdm(range(0, len(idf), batch_size)):
            topics = idf["topic"].tolist()[i : i + batch_size]
            embeds = self._embed(topics)
            embeddings.extend(embeds)
        # 임베딩 결과를 DataFrame으로 변환하여 반환합니다.
        df = DataFrame(
            [
                {
                    "index": idf.iloc[i]["index"],
                    "cid": idf.iloc[i]["cid"],
                    "embedding": e,
                }
                for i, e in enumerate(embeddings)
            ]
        )
        return df


def embed_by_openai(config: dict, api_key: str, idf: DataFrame) -> DataFrame:
    # OpenAI 임베딩 모델을 초기화하고 실행합니다.
    emb_model = OpenAIEmbeddings(
        model=config["embedding"]["openai_model"],
        api_key=SecretStr(api_key),
    )
    e = Embedding(emb_model)
    return e.run(idf)


def embed_by_ollama(config: dict, idf: DataFrame) -> DataFrame:
    # Ollama 임베딩 모델을 초기화하고 실행합니다.
    emb_model = OllamaEmbeddings(
        model=config["embedding"]["ollama_model"],
        base_url=config["embedding"]["ollama_base_url"],
    )
    e = Embedding(emb_model)
    return e.run(idf)

clustering


클러스터링에서는 벡터화된 데이터를 활용하여 클러스터링을 수행합니다. config.toml 파일에서 클러스터 수를 미리 설정하여 클러스터링을 진행합니다.

[clustering]
n_clusters = 44

코드는 아래와 같습니다.

import logging

import pandas as pd
import numpy as np
from janome.tokenizer import Tokenizer
from umap import UMAP
from hdbscan import HDBSCAN
from sklearn.feature_extraction.text import CountVectorizer
from bertopic import BERTopic

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger(__name__)

STOP_WORDS = [
    "의",  # の (no)
    "에",  # に (ni)
    "는",  # は (ha)
    "을",  # を (wo)
    "했다",  # た (ta) - past tense ending
    "이",  # が (ga)
    "에서",  # で (de)
    "하고",  # て (te) - connecting particles
    "와",  # と (to)
    "하고",  # し (shi) - verb stem
    "되다",  # れ (re) - passive/potential ending
    "하다",  # さ (sa) - noun-forming suffix
    "있다",  # ある (aru) - existence (inanimate)
    "있다",  # いる (iru) - existence (animate)
    "하다",  # する (suru) - to do
    "부터",  # から (kara) - from
    "이다",  # な (na) - adjectival suffix
    "것",  # こと (koto) - nominalizer
    "로서",  # として (toshite) - as, for
    "가다",  # いく (iku) - to go
    "않다",  # ない (nai) - negative
]
TOKENIZER = Tokenizer()


def _tokenize_japanese(text: str) -> list[str]:
    """
    일본어 텍스트를 Janome으로 토큰화하고, 불용어를 제외합니다.

    Args:
        text (str): 토큰화할 일본어 텍스트.

    Returns:
        list[str]: 불용어가 제외된 토큰 목록.
    """
    return [
        token.surface
        for token in TOKENIZER.tokenize(text)
        if token.surface not in STOP_WORDS
    ]


def _perform_clustering(
    documents: list[str],
    embeddings: np.ndarray,
    metadata_dict: dict,
    min_cluster_size: int = 10,
    n_components: int = 2,
    num_topics: int = 6,
) -> pd.DataFrame:
    """
    주어진 문서와 임베딩을 사용하여 UMAP, HDBSCAN, BERTopic을 조합한
    클러스터링을 실행합니다. 클러스터 수는 사용자가 설정한 고정값을 사용합니다.

    Args:
        documents (list[str]): 클러스터링 대상 문서 목록.
        embeddings (np.ndarray): 문서에 해당하는 임베딩 NumPy 배열.
        metadata_dict (dict): 문서의 추가 메타데이터를 포함하는 딕셔너리.
                              결과 DataFrame에 병합됩니다.
        min_cluster_size (int, optional): HDBSCAN의 최소 클러스터 크기. 기본값은 2.
        n_components (int, optional): UMAP의 차원 축소 후 차원 수. 기본값은 2.
        num_topics (int, optional): BERTopic에서 생성할 토픽(클러스터) 수. 기본값은 6.

    Returns:
        pd.DataFrame: 클러스터링 결과를 포함하는 DataFrame.
                      각 문서의 인덱스, UMAP 좌표(x, y), 확률, 클러스터 ID가 포함됩니다.

    Raises:
        Exception: 클러스터링 처리 중 오류가 발생한 경우.
    """
    logger.info(f"Starting clustering with {len(documents)} documents")

    try:
        # UMAP 차원 축소 모델 초기화
        logger.info("Initializing UMAP dimensionality reduction model")
        umap_model = UMAP(
            random_state=42,
            n_components=n_components,
            n_jobs=-1,
        )

        # HDBSCAN 클러스터링 모델 초기화
        logger.info("Initializing HDBSCAN clustering model")
        hdbscan_model = HDBSCAN(
            min_cluster_size=min_cluster_size,
            min_samples=1,
            core_dist_n_jobs=-1,
        )

        # CountVectorizer 설정
        logger.info("Setting up CountVectorizer")
        vectorizer_model = CountVectorizer(tokenizer=_tokenize_japanese)

        # BERTopic 모델 초기화 및 클러스터 수 고정
        logger.info(f"Initializing BERTopic model with nr_topics={num_topics}")
        topic_model = BERTopic(
            umap_model=umap_model,
            hdbscan_model=hdbscan_model,
            vectorizer_model=vectorizer_model,
            verbose=True,
            nr_topics=num_topics,
        )

        # BERTopic 모델 피팅 및 토픽 할당 가져오기
        logger.info("Fitting BERTopic model and getting topics")
        topics, probs = topic_model.fit_transform(documents, embeddings=embeddings)

        # UMAP 좌표 가져오기
        logger.info("Getting UMAP coordinates")
        umap_embeddings = topic_model.umap_model.transform(embeddings)

        # 결과 DataFrame 생성
        logger.info("Generating result DataFrame")
        result_df = pd.DataFrame(
            {
                "index": metadata_dict["index"],
                "cid": metadata_dict["cid"],
                "x": umap_embeddings[:, 0],
                "y": umap_embeddings[:, 1],
                "probability": probs,
                "cluster-id": topics,
            }
        )

        logger.info(
            f"Clustering completed successfully with {len(set(result_df['cluster-id']))} clusters"
        )
        return result_df

    except Exception as e:
        logger.error(f"Error during clustering: {str(e)}", exc_info=True)
        raise


def clustering(config: dict, idf: pd.DataFrame, edf: pd.DataFrame) -> pd.DataFrame:
    """
    클러스터링 파이프라인의 진입점.
    설정, 입력 DataFrame, 임베딩 DataFrame을 받아 클러스터링 결과를 반환합니다.

    Args:
        config (dict): 클러스터링 설정을 포함하는 딕셔너리.
                        `config["clustering"]["n_clusters"]`로 클러스터 수를 지정합니다.
        idf (pd.DataFrame): 토픽과 메타데이터를 포함하는 입력 DataFrame.
                            "topic", "index", "cid" 열이 필요합니다.
        edf (pd.DataFrame): 임베딩 데이터를 포함하는 DataFrame.
                            "embedding" 열이 필요합니다.

    Returns:
        pd.DataFrame: 클러스터링 결과를 포함하는 DataFrame.
                      각 문서의 인덱스, UMAP 좌표(x, y), 확률, 클러스터 ID가 포함됩니다.

    Raises:
        Exception: 클러스터링 파이프라인 중 오류가 발생한 경우.
    """
    logger.info("Starting clustering pipeline")
    try:
        logger.info("Loading topic data")
        documents_to_cluster = idf["topic"].values.tolist()
        document_embeddings = np.asarray(edf["embedding"].values.tolist())

        num_clusters = int(config["clustering"]["n_clusters"])
        logger.info(f"Configured for {num_clusters} clusters")

        # 클러스터링 실행
        clustering_result = _perform_clustering(
            documents=documents_to_cluster,
            embeddings=document_embeddings,
            metadata_dict={
                "index": idf["index"].values,
                "cid": idf["cid"].values,
            },
            min_cluster_size=10,
            num_topics=num_clusters,
        )
        logger.info("Clustering pipeline completed successfully")
        return clustering_result
    except Exception as e:
        logger.error(f"Error in clustering pipeline: {str(e)}", exc_info=True)
        raise

UMAP을 사용하여 벡터화된 정보의 차원수를 2차원으로 축소하여 산점도로 표시할 수 있도록 합니다. 이어서 HDBSCAN으로 클러스터의 밀도를 계산하고, CountVectorizer로 의미의 유사성을 측정하는 데 중요도가 낮은 단어(불용어)를 제거합니다. BERTopic에 UMAP, HDBSCAN, CountVectorizer를 전달하여 모델을 초기화합니다. 이렇게 만들어진 모델에 텍스트와 벡터 정보를 전달하면 X, Y 좌표, 소속 클러스터 번호, 해당 클러스터 주제일 확률(0.0~1.0)을 나타내는 데이터를 반환합니다.


라벨링 (Labeling)

라벨링은 생성된 클러스터에서 확률이 높은 텍스트를 무작위로 최대 16개 추출하여 클러스터에 간단한 이름을 부여하는 처리입니다.

import random

import pandas as pd


from langchain.prompts import ChatPromptTemplate
from langchain_xai import ChatXAI
from pydantic import BaseModel, Field, SecretStr
from tqdm import tqdm


class LabelData(BaseModel):
    label: str = Field(..., description="텍스트에서 생성한 라벨을 저장합니다.")


def _get_label(config: dict, api_key: str, topics: list[str]) -> str:
    llm = ChatXAI(
        model=config["labeling"]["model"],
        api_key=SecretStr(api_key),
        temperature=0.0,
    )
    topic_data = "\n".join(topics).replace("{", "{{").replace("}", "}}")
    prompt = ChatPromptTemplate.from_messages(
        [
            (
                "system",
                config["labeling"]["system_prompt"],
            ),
            (
                "human",
                topic_data,
            ),
        ]
    )
    chain = prompt | llm.with_structured_output(LabelData)
    res: LabelData = chain.invoke({})  # type: ignore
    return res.label


def labeling(
    config: dict, api_key: str, idf: pd.DataFrame, threshold: float
) -> pd.DataFrame:
    """
    클러스터링된 각 그룹에 대해 그 특징을 가장 잘 나타내는 라벨을 생성합니다.

    Args:
        config (dict): 설정 정보를 포함하는 딕셔너리.
        api_key (str): XAI API 키.
        idf (pd.DataFrame): 클러스터링된 데이터를 포함하는 DataFrame.
        threshold (float): 라벨 생성에 사용할 데이터의 확률 임계값.
                           이 값 이상의 확률을 가진 데이터만 라벨 생성에 사용됩니다.

    Returns:
        pd.DataFrame: 각 클러스터 ID와 해당 라벨을 포함하는 DataFrame.
    """
    data: list[dict[str, str]] = []
    cluster_ids = sorted(list(idf["cluster-id"].unique()))
    for cluster_id in tqdm(cluster_ids):
        # 특정 클러스터 ID와 확률 임계값을 만족하는 데이터 필터링
        cdf = idf[(idf["cluster-id"] == cluster_id) & (idf["probability"] >= threshold)]
        topics = cdf["topic"].tolist()
        random.shuffle(topics)  # 토픽을 무작위로 섞음
        # 최대 16개의 토픽 샘플을 가져와 형식화
        topic_samples = [f'"""\n{t.strip()}\n"""' for t in topics[:16]]
        label = _get_label(config, api_key, topic_samples)
        data.append({"cluster-id": cluster_id, "label": label})
    return pd.DataFrame(data)

클러스터의 특징을 강하게 나타내는 라벨 이름을 부여하기 위해, 이상치 텍스트는 제외한 상태로 라벨을 생성합니다. 명사형으로 간결하게 라벨링해 달라고 요청하면 이해하기 쉬운 라벨을 얻을 수 있으므로 추천합니다. 프롬프트는 아래와 같습니다.

model="grok-3-beta"
system_prompt = """
클러스터 분석 결과에 따라 사용자가 텍스트를 제공합니다. 클러스터의 특징을 나타내는 라벨을 생성해 주세요.
자명한 문맥은 포함하지 않고 간결하며 명사형으로 이해하기 매우 쉬운 라벨링이 필요합니다. 라벨 이름은 반드시 한국어로 출력해 주세요.
"""

chart

전처리된 텍스트, 라벨, 클러스터링 ID 및 좌표를 활용하여 산점도에 분석 결과를 시각화할 수 있습니다. Plotly Dash를 사용하면 사용자가 웹에서 직접 조작하며 내용을 확인할 수 있는 대화형 그래프를 만들 수 있으며, 웹사이트에 게시하기도 용이하여 편리합니다.

from typing import Literal
import plotly.express as px
import pandas as pd
import numpy as np


def _create_scatter_plot(
    config: dict,
    pdf: pd.DataFrame,
    ldf: pd.DataFrame,
    pallet_type: Literal[
        "high_contrast", "colorblind_friendly", "vivid"
    ] = "colorblind_friendly",
):
    df = pd.merge(pdf, ldf, on="cluster-id", how="left")

    # 클러스터 ID를 문자열로 변환 (중요: 이 과정이 없으면 컬러 팔레트가 올바르게 적용되지 않습니다)
    df["cluster-id"] = df["cluster-id"].astype(str)

    # 시인성이 더 좋은 컬러 팔레트 사용
    color_palette_options = {
        # 옵션1: Plotly의 표준 고대비 팔레트
        "high_contrast": px.colors.qualitative.Plotly,
        # 옵션2: 색맹 친화적인 팔레트
        "colorblind_friendly": [
            "#1f77b4",
            "#ff7f0e",
            "#2ca02c",
            "#d62728",
            "#9467bd",
            "#8c564b",
            "#e377c2",
            "#7f7f7f",
            "#bcbd22",
            "#17becf",
            "#aec7e8",
            "#ffbb78",
            "#98df8a",
            "#ff9896",
            "#c5b0d5",
            "#c49c94",
            "#f7b6d3",
            "#c7c7c7",
            "#dbdb8d",
            "#9edae5",
            "#393b79",
            "#637939",
            "#8c6d31",
            "#843c39",
            "#7b4173",
            "#5254a3",
            "#8ca252",
            "#bd9e39",
            "#ad494a",
            "#a55194",
        ],
        # 옵션3: 더 선명하고 구별하기 쉬운 사용자 정의 팔레트
        "vivid": [
            "#FF0000",
            "#00FF00",
            "#0000FF",
            "#FFFF00",
            "#FF00FF",
            "#00FFFF",
            "#800080",
            "#FFA500",
            "#008000",
            "#FF69B4",
            "#4169E1",
            "#DC143C",
            "#32CD32",
            "#FFD700",
            "#9932CC",
            "#FF4500",
            "#2E8B57",
            "#FF1493",
            "#1E90FF",
            "#228B22",
            "#B22222",
            "#4682B4",
            "#D2691E",
            "#9370DB",
            "#20B2AA",
            "#F4A460",
            "#8B008B",
            "#556B2F",
            "#CD5C5C",
            "#40E0D0",
        ],
    }
    selected_palette = color_palette_options[pallet_type]

    # 클러스터 수 확인 및 필요에 따라 팔레트 확장
    unique_clusters = df["cluster-id"].unique()
    num_clusters = len(unique_clusters)

    if num_clusters > len(selected_palette):
        # 팔레트가 부족하면 반복하여 사용
        extended_palette = (
            selected_palette * ((num_clusters // len(selected_palette)) + 1)
        )[:num_clusters]
        selected_palette = extended_palette

    fig = px.scatter(
        df,
        x="x",
        y="y",
        color="cluster-id",
        color_discrete_sequence=selected_palette,
        hover_data=["topic", "cluster-id", "label"],
        title=config["chart"]["title"],
        opacity=0.8,
        size_max=12,
        # 카테고리 순서를 명시적으로 지정
        category_orders={
            "cluster-id": sorted(
                unique_clusters, key=lambda x: int(x) if x != "-1" else -1
            )
        },
    )

    # 마커 경계선 추가하여 구별하기 쉽게 함
    fig.update_traces(
        marker=dict(
            line=dict(width=0.8, color="white"),  # 경계선을 약간 두껍게 하여 시인성 향상
            size=7,  # 마커 크기를 약간 크게
        )
    )

    # cluster-id를 숫자로 되돌려 그룹화 처리
    df["cluster-id-numeric"] = df["cluster-id"].astype(int)

    cluster_centers = (
        df.groupby("cluster-id-numeric")
        .agg(
            x=("x", "mean"),
            y=("y", "mean"),
            label=(
                "label",
                lambda x: x.mode()[0] if not x.mode().empty else "",
            ),
            count=("cluster-id-numeric", "count"),  # 클러스터 크기도 가져옴
        )
        .reset_index()
    )

    # 클러스터 크기에 따라 라벨 표시를 선택적으로 수행
    cluster_centers = cluster_centers.sort_values("count", ascending=False)

    # 동적으로 라벨 크기와 yshift 조정
    max_clusters_to_label = min(20, len(cluster_centers))

    for i, row in cluster_centers.head(max_clusters_to_label).iterrows():
        if row["cluster-id-numeric"] == -1:
            continue

        # 라벨 텍스트를 더 짧게 조정
        label_text = row["label"]
        if len(label_text) > 12:
            label_text = label_text[:9] + "..."

        cluster_label = f"C{row['cluster-id-numeric']}: {label_text}"

        # 클러스터 크기에 따라 폰트 크기 조정
        font_size = max(9, min(12, 8 + row["count"] // 10))

        # 중복을 피하기 위해 무작위 오프셋 추가
        np.random.seed(int(row["cluster-id-numeric"]))  # 재현성을 위해 시드 고정
        x_offset = np.random.uniform(-0.1, 0.1)
        y_offset = np.random.uniform(0.1, 0.3)

        fig.add_annotation(
            x=row["x"] + x_offset,
            y=row["y"] + y_offset,
            text=cluster_label,
            showarrow=True,
            arrowhead=2,
            arrowsize=1,
            arrowwidth=1,
            arrowcolor="rgba(0, 0, 0, 0.6)",
            ax=0,
            ay=-20,
            font=dict(
                size=font_size,
                color="black",
                family="IPAexGothic, Noto Sans CJK JP, sans-serif",
                weight="bold",
            ),
            bgcolor="rgba(255, 255, 255, 0.95)",
            bordercolor="rgba(0, 0, 0, 0.5)",
            borderwidth=1.5,
            xanchor="center",
            yanchor="top",
        )

    # 나머지 작은 클러스터에는 번호만 표시
    for i, row in cluster_centers.tail(
        len(cluster_centers) - max_clusters_to_label
    ).iterrows():
        if row["cluster-id-numeric"] == -1:
            continue

        fig.add_annotation(
            x=row["x"],
            y=row["y"],
            text=f"C{row['cluster-id-numeric']}",
            showarrow=False,
            font=dict(
                size=8,
                color="white",
                family="IPAexGothic, Noto Sans CJK JP, sans-serif",
                weight="bold",
            ),
            bgcolor="rgba(0, 0, 0, 0.8)",
            bordercolor="rgba(255, 255, 255, 0.9)",
            borderwidth=1.5,
            xanchor="center",
            yanchor="middle",
        )

    fig.update_layout(
        font=dict(family="IPAexGothic, Noto Sans CJK JP, sans-serif", size=12),
        plot_bgcolor="white",
        paper_bgcolor="white",
        width=1080,
        height=610,
        legend=dict(
            orientation="v",
            yanchor="top",
            y=1,
            xanchor="left",
            x=1.01,
            bgcolor="rgba(255, 255, 255, 0.95)",
            bordercolor="rgba(0, 0, 0, 0.4)",
            borderwidth=1.5,
            itemsizing="constant",
            font=dict(size=10, weight="bold"),
            # 범례 제목 추가
            title=dict(text="Cluster ID", font=dict(size=12, weight="bold")),
        ),
        margin=dict(l=60, r=200, t=100, b=60),
        title=dict(font=dict(size=16, weight="bold"), x=0.5, xanchor="center"),
    )

    # 축 그리드 삭제
    fig.update_xaxes(showgrid=False)
    fig.update_yaxes(showgrid=False)

    # 줌 기능 활성화로 상세 확인 용이
    fig.update_layout(xaxis=dict(fixedrange=False), yaxis=dict(fixedrange=False))

    return fig


def chart(
    config: dict,
    pdf: pd.DataFrame,
    ldf: pd.DataFrame,
    full_html: bool,
    pallet_type: Literal[
        "colorblind_friendly", "vivid", "high_contrast"
    ] = "colorblind_friendly",
) -> str:
    fig = _create_scatter_plot(config, pdf, ldf, pallet_type)
    return fig.to_html(full_html=full_html)

컬러 팔레트나 레이아웃을 편집하면 더 보기 좋은 그래프를 만들 수 있으니, 목적과 표현 방식에 맞춰 조정하시면 좋을 것 같습니다.

All

데이터 추출부터 그래프 생성까지 모든 처리를 한 번에 하고 싶을 때 all 명령어를 사용하면 좋습니다. 초기 처리나 정기적인 처리 시 유용합니다. 클러스터 수 조정과 같이 세부적인 수정이 필요할 때는 개별 처리 단계를 사용하는 것이 좋습니다.

분석 결과

Plotly 기능을 사용하여 페이지로 출력하려면 full_htmlTrue로 설정하고, 페이지 내에 포함시키려면 full_htmlFalse로 설정하세요. 이렇게 하면 분석 결과를 파일로 공유하거나 HTML에 포함하여 웹에 공개할 수 있습니다. 이 사이트의 경우 Astro를 사용하므로, MDX로 본 게시물을 작성하고 결과 HTML을 별도의 컴포넌트로 저장한 다음 import하여 표시합니다.

보시다시피, 2023년 7월부터 2025년 5월 말까지 다양한 활동을 수행했으며, 그 활동들이 자연스럽게 클러스터로 분류된 모습을 확인할 수 있습니다. 개별 점에 커서를 올리면 실제 수행한 내용을 볼 수 있어, 각 활동이 무엇이었는지 파악하거나 활동들 간의 연관성을 판단하기 용이합니다. 브로드 리스닝은 수많은 목소리를 분류하고 개별적으로 또는 전체적으로 볼 수 있는 기법이므로, 도쿄도 사례처럼 의견이나 설문조사를 분석하는 것 외에도 Bluesky 게시물 분석 등에도 활용할 수 있습니다. 다만, 목적과 방향성을 가지고 데이터를 전처리하고 분할하는 것이 중요하며, 이를 위해 사전에 준비된 데이터의 경향, 방향성, 주제 등을 파악하는 것도 필수적입니다.

요약

가까운 텍스트를 자동으로 수집하여 시각화하는 일련의 과정을 살펴보았습니다. 브로드 리스닝은 텍스트 준비나 생성형 AI 활용 등으로 어렵게 느껴질 수 있지만, 사람이 수작업으로 처리하면 매우 번거로운 절차를 AI로 쉽게 판단하고 분류할 수 있음을 체험할 수 있을 것입니다. 또한, Bluesky처럼 본인이 소유한 텍스트의 경우 재공개를 포함하여 유연하게 대응할 수 있습니다.

개인 SNS 게시물이나 메모 등이라면 쉽게 분석하고 재공개할 수 있으며, 공적으로나 사적으로 분석 및 개선을 명확하게 수행할 수 있는 유용하고 활용도 높은 기술 및 분석 방법이므로, 브로드 리스닝 기법을 본인이 소유한 텍스트에 적용해 보시길 권합니다.

관련 글