RAGFlow 완전 분석: DB 스키마부터 아키텍처까지 깊이 있는 이해

Contents

서론: RAGFlow를 제대로 이해하기 위한 여정

RAGFlow는 문서 이해 기반의 오픈소스 RAG(Retrieval-Augmented Generation) 엔진으로, 복잡한 형식의 데이터에서 진실한 질의응답 기능을 제공합니다. 단순히 사용법만 아는 것을 넘어서 RAGFlow의 내부 구조와 데이터베이스 스키마를 이해한다면, 이 강력한 도구를 더욱 효과적으로 활용할 수 있습니다.

본 포스팅에서는 RAGFlow의 핵심인 데이터베이스 스키마, 디렉토리 구조, 그리고 애플리케이션 실행 과정을 상세히 분석해보겠습니다.

RAGFlow 시스템 아키텍처 개요

전체적인 시스템 구성

RAGFlow는 마이크로서비스 아키텍처를 기반으로 설계되었으며, 다음과 같은 주요 컴포넌트들로 구성됩니다:

핵심 서비스 컴포넌트:

  • RAGFlow Server: 메인 애플리케이션 서버 (Python/Flask 기반)
  • Task Executor: 백그라운드 작업 처리 엔진
  • DeepDoc: 문서 파싱 및 분석 엔진
  • Web Frontend: React 기반 사용자 인터페이스

데이터 저장소:

  • MySQL: 메타데이터 및 관계형 데이터
  • Elasticsearch/Infinity: 벡터 검색 및 전문 검색
  • MinIO: 파일 스토리지 (S3 호환)
  • Redis: 캐싱 및 세션 관리

데이터 흐름과 처리 과정

[문서 업로드] → [DeepDoc 파싱] → [청킹 및 임베딩] → [벡터DB 저장]
                                            ↓
[사용자 질의] → [검색 엔진] → [컨텍스트 수집] → [LLM 응답 생성]

데이터베이스 스키마 심층 분석

MySQL 스키마 구조

RAGFlow는 MySQL을 메인 관계형 데이터베이스로 사용하며, 다음과 같은 주요 테이블들을 포함합니다:

사용자 관리 테이블

users 테이블

CREATE TABLE users (
    id VARCHAR(32) PRIMARY KEY,
    email VARCHAR(256) NOT NULL UNIQUE,
    password VARCHAR(256) NOT NULL,
    nickname VARCHAR(32),
    avatar TEXT,
    language VARCHAR(32) DEFAULT 'English',
    timezone VARCHAR(64) DEFAULT 'Asia/Shanghai',
    is_superuser BOOLEAN DEFAULT FALSE,
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    login_time TIMESTAMP,
    is_active BOOLEAN DEFAULT TRUE
);

tenants 테이블

CREATE TABLE tenants (
    id VARCHAR(32) PRIMARY KEY,
    name VARCHAR(128) NOT NULL,
    llm TEXT,                    -- LLM 설정 (JSON)
    embd_id VARCHAR(64),        -- 임베딩 모델 ID
    img2txt_id VARCHAR(64),     -- 이미지-텍스트 모델 ID
    asr_id VARCHAR(64),         -- ASR 모델 ID
    parser_ids VARCHAR(1024),   -- 파서 ID 목록
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

지식베이스 관리 테이블

knowledgebases 테이블

CREATE TABLE knowledgebases (
    id VARCHAR(32) PRIMARY KEY,
    tenant_id VARCHAR(32) NOT NULL,
    name VARCHAR(128) NOT NULL,
    avatar VARCHAR(1024),
    description TEXT,
    language VARCHAR(32) DEFAULT 'English',
    embd_id VARCHAR(64),
    parser_id VARCHAR(64),
    parser_config TEXT,         -- 파서 설정 (JSON)
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    chunk_num INTEGER DEFAULT 0,
    doc_num INTEGER DEFAULT 0,
    token_num INTEGER DEFAULT 0,
    FOREIGN KEY (tenant_id) REFERENCES tenants(id)
);

documents 테이블

CREATE TABLE documents (
    id VARCHAR(32) PRIMARY KEY,
    kb_id VARCHAR(32) NOT NULL,
    parser_id VARCHAR(64),
    parser_config TEXT,
    name VARCHAR(256) NOT NULL,
    type VARCHAR(64),           -- PDF, DOCX, TXT 등
    size INTEGER DEFAULT 0,
    token_num INTEGER DEFAULT 0,
    chunk_num INTEGER DEFAULT 0,
    progress FLOAT DEFAULT 0.0,
    progress_msg TEXT,
    process_begin_at TIMESTAMP,
    process_duation FLOAT DEFAULT 0.0,
    run ENUM('0', '1', '2', '3') DEFAULT '1',  -- 0: 실패, 1: 진행중, 2: 취소됨, 3: 완료
    status ENUM('0', '1') DEFAULT '1',         -- 0: 비활성, 1: 활성
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    FOREIGN KEY (kb_id) REFERENCES knowledgebases(id)
);

청크 및 임베딩 관리

document_chunks 테이블

CREATE TABLE document_chunks (
    id VARCHAR(32) PRIMARY KEY,
    doc_id VARCHAR(32) NOT NULL,
    kb_id VARCHAR(32) NOT NULL,
    img_id VARCHAR(256),        -- 이미지 청크의 경우
    content_with_weight TEXT,   -- 가중치가 포함된 콘텐츠
    content_ltks TEXT,          -- 토큰화된 콘텐츠
    content_sm_ltks TEXT,       -- 요약된 토큰 콘텐츠
    important_kwd TEXT,         -- 중요 키워드
    title_tks TEXT,             -- 제목 토큰
    page_num INTEGER DEFAULT -1,
    positions TEXT,             -- 문서 내 위치 정보 (JSON)
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    FOREIGN KEY (doc_id) REFERENCES documents(id),
    FOREIGN KEY (kb_id) REFERENCES knowledgebases(id)
);

대화 및 세션 관리

conversations 테이블

CREATE TABLE conversations (
    id VARCHAR(32) PRIMARY KEY,
    dialog_id VARCHAR(32) NOT NULL,
    user_id VARCHAR(32) NOT NULL,
    message TEXT NOT NULL,
    reference TEXT,             -- 참조 문서 정보 (JSON)
    tokens INTEGER DEFAULT 0,
    duration FLOAT DEFAULT 0.0,
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    FOREIGN KEY (user_id) REFERENCES users(id)
);

dialogs 테이블

CREATE TABLE dialogs (
    id VARCHAR(32) PRIMARY KEY,
    name VARCHAR(256) NOT NULL,
    description TEXT,
    icon VARCHAR(1024),
    top_n INTEGER DEFAULT 6,     -- 검색 시 상위 N개 결과
    top_k INTEGER DEFAULT 1024,  -- 토큰 수 제한
    rerank_id VARCHAR(64),       -- 리랭킹 모델 ID
    llm TEXT,                    -- LLM 설정 (JSON)
    prompt TEXT,                 -- 시스템 프롬프트
    similarity_threshold FLOAT DEFAULT 0.2,
    vector_similarity_weight FLOAT DEFAULT 0.3,
    create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    update_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

테이블 간 관계 (ERD)

erDiagram
    users ||--|| tenants : belongs_to
    tenants ||--o{ knowledgebases : owns
    knowledgebases ||--o{ documents : contains
    documents ||--o{ document_chunks : has
    users ||--o{ conversations : creates
    conversations }o--|| dialogs : belongs_to
    dialogs }o--o{ knowledgebases : uses
    
    users {
        string id PK
        string email UK
        string password
        string nickname
        boolean is_superuser
        timestamp create_time
    }
    
    tenants {
        string id PK
        string name
        text llm
        string embd_id
        timestamp create_time
    }
    
    knowledgebases {
        string id PK
        string tenant_id FK
        string name
        string embd_id
        integer chunk_num
        integer doc_num
    }
    
    documents {
        string id PK
        string kb_id FK
        string name
        string type
        integer size
        float progress
        enum run
    }
    
    document_chunks {
        string id PK
        string doc_id FK
        string kb_id FK
        text content_with_weight
        text positions
        integer page_num
    }
    
    conversations {
        string id PK
        string dialog_id FK
        string user_id FK
        text message
        text reference
        integer tokens
    }
    
    dialogs {
        string id PK
        string name
        integer top_n
        integer top_k
        text llm
        text prompt
    }

벡터 데이터베이스 스키마 (Elasticsearch/Infinity)

RAGFlow는 벡터 검색을 위해 Elasticsearch 또는 Infinity 데이터베이스를 사용합니다.

Elasticsearch 인덱스 구조:

{
  "mappings": {
    "properties": {
      "id": {"type": "keyword"},
      "doc_id": {"type": "keyword"},
      "kb_id": {"type": "keyword"},
      "content_with_weight": {"type": "text"},
      "content_ltks": {"type": "text"},
      "title_tks": {"type": "text"},
      "important_kwd": {"type": "text"},
      "img_id": {"type": "keyword"},
      "page_num": {"type": "integer"},
      "positions": {"type": "text"},
      "q_768_vec": {
        "type": "dense_vector",
        "dims": 768
      },
      "q_1024_vec": {
        "type": "dense_vector", 
        "dims": 1024
      },
      "create_time": {"type": "date"},
      "update_time": {"type": "date"}
    }
  }
}

디렉토리 구조 상세 분석

전체 디렉토리 구조

ragflow/
├── api/                    # API 라우터 및 엔드포인트
├── conf/                   # 설정 파일들
├── deepdoc/               # 문서 파싱 및 분석 엔진
├── docker/                # Docker 설정 파일들
├── rag/                   # RAG 핵심 로직
├── web/                   # React 프론트엔드
├── tests/                 # 테스트 코드
├── graphrag/              # Graph RAG 구현
└── agent/                 # AI 에이전트 기능

핵심 디렉토리별 상세 분석

api/ 디렉토리

api/
├── apps/                  # 애플리케이션별 API 그룹
│   ├── sdk_app.py        # SDK API
│   ├── dialog_app.py     # 대화 관련 API
│   ├── document_app.py   # 문서 관리 API
│   ├── knowledgebase_app.py  # 지식베이스 API
│   └── user_app.py       # 사용자 관리 API
├── utils/                # API 유틸리티
├── db/                   # 데이터베이스 모델 및 연결
│   ├── models.py         # SQLAlchemy 모델 정의
│   ├── init_data.py      # 초기 데이터 세팅
│   └── services/         # 비즈니스 로직 서비스
├── settings.py           # API 설정
└── versions.py           # API 버전 관리

주요 파일 분석:

api/db/models.py – 데이터베이스 모델 정의

class User(SQLModel, table=True):
    __tablename__ = "user"
    
    id: str = Field(primary_key=True)
    email: str = Field(unique=True)
    password: str
    nickname: Optional[str] = None
    avatar: Optional[str] = None
    language: str = "English"
    timezone: str = "Asia/Shanghai"
    is_superuser: bool = False
    create_time: datetime
    update_time: datetime
    login_time: Optional[datetime] = None
    is_active: bool = True

class Knowledgebase(SQLModel, table=True):
    __tablename__ = "knowledgebase"
    
    id: str = Field(primary_key=True)
    tenant_id: str = Field(foreign_key="tenant.id")
    name: str
    avatar: Optional[str] = None
    description: Optional[str] = None
    language: str = "English"
    embd_id: Optional[str] = None
    parser_id: Optional[str] = None
    parser_config: Optional[str] = None
    chunk_num: int = 0
    doc_num: int = 0
    token_num: int = 0

deepdoc/ 디렉토리

DeepDoc은 복잡한 포맷의 비구조화된 데이터에서 지식 추출을 담당하는 핵심 컴포넌트입니다.

deepdoc/
├── parser/               # 문서 파서들
│   ├── pdf_parser.py    # PDF 파싱
│   ├── docx_parser.py   # Word 문서 파싱
│   ├── excel_parser.py  # Excel 파싱
│   ├── ppt_parser.py    # PowerPoint 파싱
│   └── html_parser.py   # HTML 파싱
├── vision/              # 컴퓨터 비전 모델
│   ├── layout_recognizer.py  # 레이아웃 인식
│   ├── table_detector.py     # 테이블 감지
│   └── ocr.py               # OCR 처리
├── chunker/             # 텍스트 청킹 전략
│   ├── naive_chunker.py     # 기본 청킹
│   ├── manual_chunker.py    # 수동 청킹
│   └── paragraph_chunker.py # 단락 기반 청킹
└── utils/               # 유틸리티 함수들

rag/ 디렉토리

RAG의 핵심 로직이 구현된 디렉토리입니다.

rag/
├── nlp/                 # 자연어 처리 모듈
│   ├── search.py       # 검색 로직
│   ├── rag.py          # RAG 메인 로직
│   ├── keyword.py      # 키워드 추출
│   └── entity.py       # 개체명 인식
├── llm/                # LLM 통합
│   ├── chat_model.py   # 채팅 모델 인터페이스
│   ├── embedding_model.py  # 임베딩 모델
│   └── rerank_model.py     # 리랭킹 모델
├── utils/              # RAG 유틸리티
└── settings.py         # RAG 설정

web/ 디렉토리 (프론트엔드)

web/
├── src/
│   ├── components/     # React 컴포넌트
│   ├── pages/          # 페이지 컴포넌트
│   ├── hooks/          # 커스텀 훅
│   ├── interfaces/     # TypeScript 인터페이스
│   ├── utils/          # 유틸리티 함수
│   └── constants/      # 상수 정의
├── public/             # 정적 파일
├── package.json        # npm 설정
└── tsconfig.json      # TypeScript 설정

애플리케이션 실행 과정 분석

시스템 초기화 과정

1단계: Docker 컨테이너 시작

docker compose -f docker-compose.yml up -d

시스템 시작 시 다음 순서로 서비스들이 초기화됩니다:

  1. MySQL: 데이터베이스 서비스 시작
  2. Redis: 캐싱 서비스 초기화
  3. MinIO: 객체 스토리지 서비스
  4. Elasticsearch: 검색 엔진 서비스
  5. RAGFlow Server: 메인 애플리케이션

2단계: 데이터베이스 초기화

conf/db_init.py에서 데이터베이스 스키마 생성:

def database_init():
    # 테이블 생성
    create_tables()
    
    # 기본 데이터 삽입
    init_superuser()
    init_default_llm()
    init_parser_models()
    
    # 인덱스 생성
    create_indexes()

주요 초기화 작업:

  • 모든 테이블 스키마 생성
  • 기본 관리자 계정 생성
  • 기본 LLM 모델 설정
  • 파서 모델 등록
  • Elasticsearch 인덱스 생성

3단계: 애플리케이션 서버 시작

ragflow_server.py가 메인 엔트리포인트 역할:

def main():
    # 설정 로드
    load_configurations()
    
    # 데이터베이스 연결
    init_database_connections()
    
    # Flask 앱 초기화
    app = create_app()
    
    # API 라우터 등록
    register_blueprints(app)
    
    # 서버 시작
    app.run(host='0.0.0.0', port=9380)

요청 처리 흐름

문서 업로드 및 처리 과정

sequenceDiagram
    participant U as User
    participant W as Web Frontend
    participant A as API Server
    participant D as DeepDoc
    participant E as Elasticsearch
    participant M as MySQL
    participant S as MinIO

    U->>W: 문서 업로드
    W->>A: POST /api/v1/documents
    A->>M: 문서 메타데이터 저장
    A->>S: 원본 파일 저장
    A->>D: 문서 파싱 요청
    
    Note over D: 문서 분석 및 청킹
    D->>D: 레이아웃 인식
    D->>D: 텍스트 추출
    D->>D: 청크 분할
    D->>D: 임베딩 생성
    
    D->>M: 청크 메타데이터 저장
    D->>E: 벡터 및 텍스트 저장
    D->>A: 처리 완료 응답
    A->>W: 성공 응답
    W->>U: 업로드 완료 표시

RAG 질의응답 과정

sequenceDiagram
    participant U as User
    participant W as Web Frontend  
    participant A as API Server
    participant R as RAG Engine
    participant E as Elasticsearch
    participant L as LLM Service

    U->>W: 질문 입력
    W->>A: POST /api/v1/conversation
    A->>R: 질의 처리 요청
    
    R->>R: 질의 분석 및 키워드 추출
    R->>E: 유사도 검색 수행
    E->>R: 관련 청크 반환
    R->>R: 컨텍스트 구성
    R->>L: LLM 응답 생성 요청
    L->>R: 생성된 응답
    
    R->>A: 최종 응답 (출처 포함)
    A->>W: JSON 응답
    W->>U: 답변 표시

백그라운드 작업 처리

task_executor.py는 비동기 작업들을 처리합니다:

class TaskExecutor:
    def __init__(self):
        self.task_queue = Queue()
        self.workers = []
    
    def start_workers(self, num_workers=4):
        for i in range(num_workers):
            worker = TaskWorker(self.task_queue)
            worker.start()
            self.workers.append(worker)
    
    def add_task(self, task_type, **kwargs):
        task = {
            'type': task_type,
            'data': kwargs,
            'timestamp': time.time()
        }
        self.task_queue.put(task)

주요 백그라운드 작업:

  • 문서 파싱 및 임베딩
  • 인덱스 재구축
  • 통계 데이터 업데이트
  • 파일 정리 작업

성능 최적화 전략

데이터베이스 최적화

인덱스 전략:

-- 검색 성능 향상을 위한 인덱스
CREATE INDEX idx_chunks_kb_id ON document_chunks(kb_id);
CREATE INDEX idx_chunks_doc_id ON document_chunks(doc_id);
CREATE INDEX idx_documents_kb_id ON documents(kb_id);
CREATE INDEX idx_conversations_dialog_id ON conversations(dialog_id);

-- 복합 인덱스
CREATE INDEX idx_chunks_kb_doc ON document_chunks(kb_id, doc_id);
CREATE INDEX idx_docs_kb_status ON documents(kb_id, status, run);

쿼리 최적화:

  • 배치 삽입을 통한 대량 데이터 처리
  • 페이지네이션을 통한 메모리 사용량 제어
  • 캐싱 레이어를 통한 반복 쿼리 최적화

벡터 검색 최적화

Elasticsearch 설정:

{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "refresh_interval": "30s",
    "index.codec": "best_compression"
  }
}

검색 성능 향상:

  • ANN(Approximate Nearest Neighbor) 알고리즘 활용
  • 임베딩 차원 최적화
  • 하이브리드 검색(벡터 + 키워드) 구현

보안 및 권한 관리

데이터 보안

암호화 전략:

  • 사용자 비밀번호: bcrypt 해싱
  • API 키: AES-256 암호화
  • 파일 저장: MinIO 서버사이드 암호화

접근 제어:

def check_kb_permission(user_id, kb_id, permission='read'):
    """지식베이스 접근 권한 확인"""
    kb = get_knowledgebase(kb_id)
    if not kb:
        return False
    
    tenant = get_tenant(kb.tenant_id)
    return user_has_tenant_access(user_id, tenant.id, permission)

API 보안

인증 및 인가:

  • JWT 토큰 기반 인증
  • Role-based Access Control (RBAC)
  • Rate limiting을 통한 API 남용 방지

모니터링 및 로깅

시스템 모니터링

주요 메트릭:

  • 문서 처리 성능 (처리 시간, 처리량)
  • 검색 응답 시간
  • 메모리 및 CPU 사용량
  • 데이터베이스 연결 상태

로그 관리

로그 레벨 구성:

LOGGING_CONFIG = {
    'version': 1,
    'formatters': {
        'default': {
            'format': '[%(asctime)s] %(levelname)s in %(module)s: %(message)s',
        }
    },
    'handlers': {
        'file': {
            'class': 'logging.handlers.RotatingFileHandler',
            'filename': 'logs/ragflow.log',
            'maxBytes': 50000000,  # 50MB
            'backupCount': 5,
            'formatter': 'default'
        }
    }
}

확장성 고려사항

수평 확장

마이크로서비스 분리:

  • 문서 처리 서비스 독립화
  • 검색 서비스 스케일링
  • API Gateway를 통한 로드 밸런싱

데이터 파티셔닝

샤딩 전략:

def get_shard_key(kb_id):
    """지식베이스 ID 기반 샤딩"""
    return hash(kb_id) % NUM_SHARDS

def get_es_index_name(kb_id):
    """샤드별 Elasticsearch 인덱스"""
    shard = get_shard_key(kb_id)
    return f"ragflow_chunks_shard_{shard}"

실제 운영 환경에서의 고려사항

배포 전략

Blue-Green 배포:

  • 무중단 서비스 업데이트
  • 롤백 능력 확보
  • 데이터베이스 마이그레이션 자동화

백업 및 복구

데이터 백업 전략:

  • MySQL: 정기적인 덤프 및 바이너리 로그 백업
  • Elasticsearch: 스냅샷을 통한 인덱스 백업
  • MinIO: 객체 스토리지 복제 설정

개발 환경 구축 가이드

로컬 개발 환경

개발 서버 실행:

# 백엔드 의존성 설치
uv sync --python 3.10 --all-extras

# 의존 서비스 시작
docker compose -f docker/docker-compose-base.yml up -d

# 백엔드 서비스 시작
source .venv/bin/activate
export PYTHONPATH=$(pwd)
bash docker/launch_backend_service.sh

# 프론트엔드 개발 서버
cd web
npm install
npm run dev

개발용 설정:

# conf/development.py
class DevelopmentConfig:
    DEBUG = True
    SQLALCHEMY_ECHO = True  # SQL 쿼리 로깅
    
    # 개발용 데이터베이스
    DATABASE_URL = "mysql://root:infiniflow@localhost:3306/rag"
    
    # 로컬 MinIO 설정
    MINIO_ENDPOINT = "localhost:9000"
    MINIO_ACCESS_KEY = "minioadmin"
    MINIO_SECRET_KEY = "minioadmin"
    
    # Elasticsearch 설정
    ELASTICSEARCH_HOST = "localhost:9200"

디버깅 및 프로파일링

성능 프로파일링:

from functools import wraps
import time
import cProfile

def profile_function(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        pr = cProfile.Profile()
        pr.enable()
        result = func(*args, **kwargs)
        pr.disable()
        pr.dump_stats(f'profile_{func.__name__}.prof')
        return result
    return wrapper

@profile_function
def process_document(doc_path, kb_id):
    """문서 처리 함수 프로파일링"""
    # 문서 처리 로직
    pass

고급 기능 및 확장 포인트

커스텀 파서 개발

RAGFlow의 파서 시스템은 확장 가능한 구조로 설계되어 있습니다:

# deepdoc/parser/base_parser.py
class BaseParser:
    def __init__(self, config):
        self.config = config
    
    def parse(self, file_path, **kwargs):
        """파일 파싱 메인 메서드"""
        raise NotImplementedError
    
    def chunk(self, content, **kwargs):
        """콘텐츠 청킹"""
        raise NotImplementedError

# 커스텀 파서 예제
class CustomXMLParser(BaseParser):
    def parse(self, file_path, **kwargs):
        # XML 파일 파싱 로직
        import xml.etree.ElementTree as ET
        tree = ET.parse(file_path)
        root = tree.getroot()
        
        content = []
        for elem in root.iter():
            if elem.text:
                content.append({
                    'text': elem.text,
                    'tag': elem.tag,
                    'attributes': elem.attrib
                })
        
        return content

벡터 임베딩 모델 통합

커스텀 임베딩 모델 추가:

# rag/llm/embedding_model.py
class CustomEmbeddingModel:
    def __init__(self, model_path, device='cpu'):
        self.model_path = model_path
        self.device = device
        self.model = self.load_model()
    
    def load_model(self):
        # 모델 로딩 로직
        pass
    
    def encode(self, texts, batch_size=32):
        """텍스트를 벡터로 변환"""
        embeddings = []
        for i in range(0, len(texts), batch_size):
            batch = texts[i:i+batch_size]
            batch_embeddings = self._encode_batch(batch)
            embeddings.extend(batch_embeddings)
        return embeddings
    
    def _encode_batch(self, texts):
        # 배치 단위 인코딩
        pass

GraphRAG 기능

RAGFlow는 GraphRAG 기능도 지원합니다:

# graphrag/graph_builder.py
class KnowledgeGraphBuilder:
    def __init__(self, llm_client, embedding_model):
        self.llm_client = llm_client
        self.embedding_model = embedding_model
    
    def build_graph(self, chunks):
        """청크들로부터 지식 그래프 구축"""
        entities = self.extract_entities(chunks)
        relationships = self.extract_relationships(chunks, entities)
        
        graph = {
            'entities': entities,
            'relationships': relationships
        }
        
        return graph
    
    def extract_entities(self, chunks):
        """개체 추출"""
        entities = {}
        for chunk in chunks:
            chunk_entities = self.llm_client.extract_entities(chunk['content'])
            for entity in chunk_entities:
                if entity['name'] not in entities:
                    entities[entity['name']] = {
                        'type': entity['type'],
                        'description': entity['description'],
                        'chunks': []
                    }
                entities[entity['name']]['chunks'].append(chunk['id'])
        
        return entities

트러블슈팅 가이드

일반적인 문제와 해결책

문제 1: 문서 파싱 실패

# 해결책: 로그를 통한 문제 진단
def debug_parsing_error(doc_id):
    doc = get_document(doc_id)
    
    # 파일 존재 확인
    if not os.path.exists(doc.path):
        logger.error(f"File not found: {doc.path}")
        return
    
    # 파일 형식 확인
    file_type = detect_file_type(doc.path)
    if file_type != doc.type:
        logger.warning(f"File type mismatch: expected {doc.type}, got {file_type}")
    
    # 파서 설정 확인
    parser_config = json.loads(doc.parser_config)
    logger.info(f"Parser config: {parser_config}")
    
    # 메모리 사용량 확인
    memory_usage = get_memory_usage()
    if memory_usage > 0.9:
        logger.warning("High memory usage detected")

문제 2: 검색 성능 저하

def optimize_search_performance(kb_id):
    # 인덱스 상태 확인
    es_client = get_elasticsearch_client()
    index_name = get_index_name(kb_id)
    
    index_stats = es_client.indices.stats(index=index_name)
    logger.info(f"Index stats: {index_stats}")
    
    # 세그먼트 병합
    es_client.indices.forcemerge(index=index_name, max_num_segments=1)
    
    # 캐시 클리어
    es_client.indices.clear_cache(index=index_name)

성능 모니터링

시스템 메트릭 수집:

import psutil
import threading
import time

class SystemMonitor:
    def __init__(self):
        self.metrics = {}
        self.running = False
    
    def start_monitoring(self):
        self.running = True
        thread = threading.Thread(target=self._collect_metrics)
        thread.daemon = True
        thread.start()
    
    def _collect_metrics(self):
        while self.running:
            self.metrics.update({
                'cpu_percent': psutil.cpu_percent(interval=1),
                'memory_percent': psutil.virtual_memory().percent,
                'disk_usage': psutil.disk_usage('/').percent,
                'active_connections': get_active_db_connections(),
                'es_query_time': get_avg_es_query_time(),
                'timestamp': time.time()
            })
            time.sleep(60)  # 1분마다 수집

API 활용 가이드

RESTful API 엔드포인트

주요 API 엔드포인트:

# 지식베이스 관리
POST   /api/v1/knowledgebases          # 지식베이스 생성
GET    /api/v1/knowledgebases          # 지식베이스 목록
PUT    /api/v1/knowledgebases/{kb_id}  # 지식베이스 수정
DELETE /api/v1/knowledgebases/{kb_id}  # 지식베이스 삭제

# 문서 관리
POST   /api/v1/documents               # 문서 업로드
GET    /api/v1/documents               # 문서 목록
DELETE /api/v1/documents/{doc_id}      # 문서 삭제

# 대화 관리
POST   /api/v1/conversations           # 대화 생성
GET    /api/v1/conversations/{conv_id} # 대화 조회
POST   /api/v1/conversations/{conv_id}/messages  # 메시지 전송

API 사용 예제:

import requests

class RAGFlowClient:
    def __init__(self, base_url, api_key):
        self.base_url = base_url
        self.headers = {'Authorization': f'Bearer {api_key}'}
    
    def create_knowledgebase(self, name, description=""):
        """지식베이스 생성"""
        data = {
            'name': name,
            'description': description,
            'language': 'English'
        }
        response = requests.post(
            f"{self.base_url}/api/v1/knowledgebases",
            json=data,
            headers=self.headers
        )
        return response.json()
    
    def upload_document(self, kb_id, file_path):
        """문서 업로드"""
        with open(file_path, 'rb') as f:
            files = {'file': f}
            data = {'kb_id': kb_id}
            response = requests.post(
                f"{self.base_url}/api/v1/documents",
                files=files,
                data=data,
                headers=self.headers
            )
        return response.json()
    
    def ask_question(self, dialog_id, question):
        """질문하기"""
        data = {
            'question': question,
            'stream': False
        }
        response = requests.post(
            f"{self.base_url}/api/v1/conversations/{dialog_id}/messages",
            json=data,
            headers=self.headers
        )
        return response.json()

# 사용 예제
client = RAGFlowClient("http://localhost", "your-api-key")

# 지식베이스 생성
kb = client.create_knowledgebase("My Knowledge Base")
kb_id = kb['data']['id']

# 문서 업로드
doc = client.upload_document(kb_id, "/path/to/document.pdf")

# 질문하기
answer = client.ask_question(dialog_id, "문서의 주요 내용은 무엇인가요?")
print(answer['data']['answer'])

SDK 개발

Python SDK 구조:

# ragflow_sdk/client.py
class RAGFlowSDK:
    def __init__(self, base_url, api_key):
        self.client = RAGFlowClient(base_url, api_key)
        self.knowledgebases = KnowledgebaseService(self.client)
        self.documents = DocumentService(self.client)
        self.conversations = ConversationService(self.client)
    
    def __enter__(self):
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        # 정리 작업
        pass

# 사용법
with RAGFlowSDK("http://localhost", "api-key") as rag:
    kb = rag.knowledgebases.create("Test KB")
    doc = rag.documents.upload(kb.id, "test.pdf")
    answer = rag.conversations.ask(dialog_id, "What is this document about?")

커뮤니티 및 기여 가이드

오픈소스 기여 방법

코드 기여 프로세스:

  1. Fork the repository
  2. Create feature branch
  3. Implement changes
  4. Add tests
  5. Submit pull request

코딩 스타일 가이드:

# .pre-commit-config.yaml
repos:
  - repo: https://github.com/psf/black
    rev: 23.1.0
    hooks:
      - id: black
        language_version: python3
  
  - repo: https://github.com/PyCQA/flake8
    rev: 6.0.0
    hooks:
      - id: flake8
        args: [--max-line-length=88]

버그 리포트 및 기능 요청

이슈 템플릿:

## Bug Report

**Environment:**
- RAGFlow version: 
- OS: 
- Python version: 

**Steps to reproduce:**
1. 
2. 
3. 

**Expected behavior:**

**Actual behavior:**

**Logs:**

결론 및 향후 발전 방향

RAGFlow의 강점

RAGFlow는 다음과 같은 핵심 강점을 가지고 있습니다:

  1. 깊이 있는 문서 이해: DeepDoc 엔진을 통한 복잡한 문서 구조 분석
  2. 유연한 아키텍처: 마이크로서비스 기반의 확장 가능한 구조
  3. 강력한 검색 엔진: 벡터 검색과 전문 검색의 하이브리드 접근
  4. 직관적인 사용자 경험: 시각화된 청킹과 추적 가능한 인용

향후 발전 방향

단기 로드맵:

  • Multi-modal RAG 기능 강화
  • Real-time 협업 기능 추가
  • API 성능 최적화
  • Mobile-friendly 인터페이스

장기 비전:

  • AutoML 기반 파이프라인 최적화
  • Federated Learning 지원
  • Edge Computing 배포 지원
  • Enterprise 기능 확장

실제 도입 시 고려사항

기술적 고려사항:

  1. 하드웨어 요구사항: 최소 16GB RAM, 4 core CPU 필요
  2. 네트워크 대역폭: 대용량 문서 처리 시 충분한 대역폭 확보
  3. 저장공간 계획: 문서 원본, 벡터, 인덱스를 고려한 용량 계획

운영적 고려사항:

  1. 백업 전략: 정기적인 데이터 백업 및 복구 계획 수립
  2. 보안 정책: 민감한 데이터 처리 시 암호화 및 접근 제어
  3. 성능 모니터링: 지속적인 성능 모니터링 및 튜닝

RAGFlow의 데이터베이스 스키마와 아키텍처를 깊이 이해함으로써, 단순한 사용을 넘어서 시스템을 최적화하고 확장할 수 있는 기반을 마련할 수 있습니다. 오픈소스 프로젝트의 특성상 지속적인 발전과 커뮤니티 기여를 통해 더욱 강력한 RAG 솔루션으로 발전할 것으로 기대됩니다.

참고 자료

이 포스팅이 RAGFlow를 더 깊이 이해하고 효과적으로 활용하는 데 도움이 되기를 바랍니다. RAGFlow의 강력한 기능을 최대한 활용하여 여러분의 프로젝트에 적용해보세요.

Leave a Comment