PostgreSQL

PostgreSQL: FDW를 활용한 동기화 Batch

dewstream 2025. 7. 17. 08:00
728x90

※ PostgreSQL: Synchronization Batch Using FDW.

 

안녕하세요. 듀스트림입니다.

 

오늘 포스팅은 Postgres_fdw를 활용하여 데이터를 원천(Remote)에서 로컬로 동기화하는 방법에 대한 내용입니다.


이 구성은 주로 스테이징 DB를 따로 사용하거나, 프론트 영역과 OLAP 영역이 나눠져 있을 경우 나오는 아키텍처입니다.


1. postgres_fdw 개념과 핵심 기능

범주 설명
원격 테이블 매핑 CREATE SERVER → CREATE USER MAPPING → CREATE FOREIGN TABLE만으로 외부 PostgreSQL 테이블을 지역 DB 객체처럼 다룸.
푸시다운 최적화 WHERE‧JOIN‧AGGREGATE를 가능한 한 원격 서버로 밀어 넣어 네트워크 전송량을 최소화. (필요 시 use_remote_estimate 옵션으로 원격 EXPLAIN 통계 사용)
배치 INSERT batch_size(기본 1) 옵션으로 한 번에 여러 행을 전송하여 왕복 RTT를 줄임.
페치 사이즈 fetch_size 옵션으로 커서‑기반 가져오기 배치를 조절해 메모리·네트워크 균형을 맞춤.
비동기 스캔 async_capable=true일 때 Append 하위 계획을 병렬로 실행해 복수 테이블 조회 시 속도를 높임.
병렬 커밋/어보트 parallel_commit/parallel_abort가 여러 원격 트랜잭션을 동시에 종료해 대기 시간을 단축.
• fdw 라이브러리는 로컬 서버에서만 로드됩니다.
• Remote(소스) 쪽은 단순 TCP 접속 대상으로, FDW 확장 자체를 설치할 필요가 없습니다.
• FDW 코드가 동작하는 곳은 로컬 서버 프로세스입니다. Remote(소스) 서버는 FDW와 관계없이 그냥 클라이언트(로컬 서버)가 보내는 쿼리만 처리합니다.

 

+ fdw 옵션 정리

옵션 적용 위치 타입 기본값 설명
schema_name foreign table string foreign schema 원격 객체의 스키마 명 지정
table_name foreign table string foreign table 원격 테이블 명 지정
column_name foreign table column string column name 원격 컬럼명 매핑
use_remote_estimate server / table boolean false 원격 EXPLAIN 사용 여부
fdw_startup_cost server float 100 원격 접속 및 초기 오버헤드
fdw_tuple_cost server float 0.2 레코드 전송 비용
analyze_sampling server / table string auto ANALYZE 수행 방식
extensions server string 원격 실행 허용 확장 목록
fetch_size server / table integer 100 한 번에 가져올 행 수
batch_size server / table integer 1 DML 일괄 처리 시 한번에 전송 행 수
async_capable server / table boolean false 비동기 실행 가능 여부
parallel_commit server boolean false 병렬 커밋 여부
parallel_abort server boolean false 병렬 롤백 여부
updatable server / table boolean true DML 가능 여부
truncatable server / table boolean true TRUNCATE 가능 여부
keep_connections server boolean on 세션 재사용 연결 유지 여부
import_collate IMPORT FOREIGN SCHEMA boolean true COLLATE 포함 여부
import_default IMPORT FOREIGN SCHEMA boolean false DEFAULT 포함 여부
import_generated IMPORT FOREIGN SCHEMA boolean true GENERATED 컬럼 포함 여부
import_not_null IMPORT FOREIGN SCHEMA boolean true NOT NULL 포함 여부

 

++ use_remote_estimate

  • use_remote_estimate = true
    • 원격 서버에 EXPLAIN 명령을 보내 계획 비용 및 행 수를 수집.
    • 이후 fdw_startup_cost + fdw_tuple_cost를 해당 값에 추가.
    • 장점: 정확도 높음, 원격 조인(pushdown) 활용 가능.
    • 단점: EXPLAIN 요청 비용, 네트워크 지연 있음.
  • use_remote_estimate = false
    • 로컬에서 통계 기반 추정 수행(분석된 통계 테이블 사용).
    • 추가로 fdw_startup_cost + fdw_tuple_cost 적용.
    • 비용: 반복 적용된 fdw_tuple_cost로 인해 과다 평가 가능.
    • 단점: 원격 테이블 통계 없거나 오래되면 부정확.
# 플로우 차트

클라이언트 쿼리 → 계획 수립 단계

├─ 서버/표 레벨에 use_remote_estimate 설정?
│       ├─ true
│       │   └→ 원격 서버에 EXPLAIN 실행 → 비용/행 수 받아옴
│       └─ false
│           └→ 로컬 추정 (statistics 또는 default)
│
→ fdw_startup_cost + fdw_tuple_cost 더함
→ 계획 완료

 

+++ IMPORT FOREIGN SCHEMA

원격 서버의 특정 스키마 또는 테이블 집합을 로컬의 foreign table로 자동 생성하는 명령입니다.

-- 사용 예시 --

-- 로컬에 schema 생성
CREATE SCHEMA IF NOT EXISTS remote_data;

-- 원격 스키마 전체 가져오기
IMPORT FOREIGN SCHEMA public
  FROM SERVER myserver
  INTO remote_data
  OPTIONS (
    import_collate 'true',
    import_default 'false',
    import_not_null 'true'
  );

-- 특정 테이블만 가져오기
IMPORT FOREIGN SCHEMA public
  LIMIT TO (employees, departments)
  FROM SERVER myserver
  INTO remote_data;

-- 특정 테이블 제외하고 가져오기
IMPORT FOREIGN SCHEMA public
  EXCEPT (temp_table)
  FROM SERVER myserver
  INTO remote_data;

2. 설치 방법

PostgreSQL 소스 파일의 경우 저는 보통 wget으로 받아서 사용합니다.

posgres_fdw는 로컬 서버에만 설치합니다.

 

cd ~
wget https://ftp.postgresql.org/pub/source/v17.5/postgresql-17.5.tar.gz

 

▸ postgres_fdw 설치

cd /home/postgres/postgresql-17.5/contrib/postgres_fdw

make USE_PGXS=1
make install

 

▸ 익스텐션 추가

CREATE EXTENSION IF NOT EXISTS postgres_fdw;


3. 시나리오

▸ 환경

구분 버전 DB명 설명
소스 Rocky 8.10 / PostgreSQL 10.23 src 데이터 계속 INSERT (or UPDATE)
타겟 Rocky 8.10 / PostgreSQL 17.5 tgt 1 시간마다 증분 수집·집계

 

▸ Remote DB 설정

-- fdw 유저 생성
CREATE ROLE fdw_user LOGIN PASSWORD '1234';

-- 테스트용 데이터베이스 생성
CREATE DATABASE test OWNER fdw_user;

-- 커넥션 변경
\c test fdw_user

-- 테스트 테이블 생성
CREATE TABLE public.metrics (
    ts    TIMESTAMPTZ PRIMARY KEY,
    value NUMERIC
);

-- 권한 설정 (필요시)
GRANT SELECT,INSERT ON public.metrics TO fdw_user;

-- 데이터 삽입
INSERT INTO public.metrics (ts, value)
SELECT  date_trunc('minute', now())
        + (g * interval '1 microsecond') ,
        random()
FROM    generate_series(0, 999999) AS g;
-- 플래그 컬럼 추가
ALTER TABLE public.metrics
  ADD COLUMN created_at  timestamptz NOT NULL DEFAULT now(),
  ADD COLUMN updated_at  timestamptz NOT NULL DEFAULT now();
  
-- 업데이트 트리거 함수 정의
CREATE OR REPLACE FUNCTION trg_set_updated_at()
RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
  NEW.updated_at := now(); -- 업데이트마다 현재 시각으로 갱신
  RETURN NEW;
END;
$$;

-- 트리거 바인딩(PG 10~11)
CREATE TRIGGER set_updated_at
BEFORE UPDATE ON public.metrics
FOR EACH ROW
EXECUTE PROCEDURE trg_set_updated_at();

-- + 트리거 바인딩(PG 12+)
CREATE TRIGGER set_updated_at
BEFORE UPDATE ON public.metrics
FOR EACH ROW
EXECUTE FUNCTION trg_set_updated_at();

 

▸ Local DB 설정

-- fdw 유저 생성
CREATE ROLE fdw_user LOGIN PASSWORD '1234';

-- 테스트용 데이터베이스 생성
CREATE DATABASE tgt OWNER fdw_user;
ALTER ROLE fdw_user SUPERUSER; --fdw server 생성을 위해

-- 커넥션 변경
\c tgt fdw_user

/* fdw 설정 */
-- fdw 익스텐션 추가
CREATE EXTENSION IF NOT EXISTS postgres_fdw;

-- fdw 서버 생성
CREATE SERVER src_server
        FOREIGN DATA WRAPPER postgres_fdw
        OPTIONS (host '10.12.20.105', port '5432', dbname 'test');

-- fdw 유저 매핑
CREATE USER MAPPING FOR fdw_user
        SERVER src_server
        OPTIONS (user 'fdw_user', password '1234');
-- 원격 참조용 FOREIGN TABLE 생성
CREATE FOREIGN TABLE public.metrics_src (
    ts          timestamptz,
    value       numeric,
    updated_at  timestamptz
) SERVER src_server
  OPTIONS (schema_name 'public', table_name 'metrics',
           fetch_size '10000');
           
-- 동기화용 로컬 테이블 생성
CREATE TABLE public.metrics_local (
    ts          timestamptz PRIMARY KEY,
    value       numeric
);
-- 동기화 메타 정보 테이블 생성
CREATE TABLE public.sync_meta (
  src_server text PRIMARY KEY,
  last_ts    timestamptz
);

-- 초기 데이터 삽입
INSERT INTO public.sync_meta VALUES ('test', 'epoch');
-- 초기 적재
MERGE INTO public.metrics_local AS tgt
USING public.metrics_src AS src
ON  (tgt.ts = src.ts)
WHEN MATCHED THEN
     UPDATE SET value = src.value
WHEN NOT MATCHED THEN
     INSERT (ts, value)
     VALUES (src.ts, src.value);
#!/bin/bash
##############################################################################
# 동기화 배치: 소스(${SRC_TABLE}) → 타겟(${LOCAL_TABLE})
##############################################################################

# 0) 사용자 정의 변수 ---------------------------------------------------------
PG_BIN="/postgres/bin"
DB_HOST="127.0.0.1"
DB_PORT="5432"
DB_NAME="tgt"
DB_USER="fdw_user"
DB_PASS="1234"

SRC_SERVER="test"
SRC_TABLE="public.metrics_src"
LOCAL_TABLE="public.metrics_local"
SYNC_META_TABLE="public.sync_meta"

LOG_PATH="/data/postgres_data/log/sync_metrics.log"

# 1) 공통 환경 변수 ----------------------------------------------------------
export PATH="$PG_BIN:/usr/sbin:/usr/bin:/sbin:/bin:$PATH"
export PGOPTIONS="-c timezone=UTC"
export PGPASSWORD="$DB_PASS"
PSQL_OPTS="-v ON_ERROR_STOP=1 -q -A -t -F',' -h $DB_HOST -p $DB_PORT -d $DB_NAME -U $DB_USER"

# 2) 로그 --------------------------------------------------------------------
mkdir -p "$(dirname "$LOG_PATH")"
exec >>"$LOG_PATH" 2>&1
log(){ echo "[$(date '+%F %T')] $*"; }

log "batch_started"

# 3) 동기화 SQL --------------------------------------------------------------
read -r -d '' SQL_STMT <<EOSQL
WITH last AS (
  SELECT COALESCE(last_ts, 'epoch'::timestamptz) AS from_ts
  FROM   $SYNC_META_TABLE
  WHERE  src_server = '$SRC_SERVER'
),
delta AS (
  SELECT ts, value, updated_at
  FROM   $SRC_TABLE, last
  WHERE  updated_at > from_ts
),
merged AS (
  MERGE INTO $LOCAL_TABLE AS tgt
  USING delta AS src
  ON  (tgt.ts = src.ts)
  WHEN MATCHED THEN
       UPDATE SET value = src.value
  WHEN NOT MATCHED THEN
       INSERT (ts, value) VALUES (src.ts, src.value)
  RETURNING merge_action() AS action           -- <─ ★
),
upd_meta AS (                                   -- 메타 테이블 갱신
  UPDATE $SYNC_META_TABLE
  SET    last_ts = (SELECT COALESCE(MAX(updated_at), last_ts) FROM $SRC_TABLE WHERE updated_at > (SELECT from_ts FROM last))
  WHERE  src_server = '$SRC_SERVER'
)
SELECT
  COUNT(*) FILTER (WHERE action = 'INSERT') AS ins_cnt,
  COUNT(*) FILTER (WHERE action = 'UPDATE') AS upd_cnt
FROM merged;
EOSQL

# 4) psql 실행 후 결과 캡처 ---------------------------------------------------
COUNTS=$(runuser -l postgres -c "psql $PSQL_OPTS -c \"$SQL_STMT\"")
RET=$?   # runuser 내부의 psql 종료코드

if [ $RET -eq 0 ]; then
    IFS=',' read -r INS_CNT UPD_CNT <<< "$COUNTS"
    log "sync_completed_successfully inserted=$INS_CNT updated=$UPD_CNT"
else
    log "sync_failed_check_above (psql_exit=$RET)"
    exit 1
fi

log "batch_finished"
온전한 단일 트랜잭션 처리를 하시려면 SQL에 BEGIN; COMMIT;를 추가하시거나, psql -1 옵션을 추가하시면 됩니다.
단, -1 옵션을 사용하실 경우 SQL에 BEGIN; COMMIT;가 있으면 충돌이 발생합니다.
chmod +x sync_metrics.sh
# 크론 설정
0 * * * * /bin/bash /postgres/etc/sync_metrics.sh

 

이후에 초기 적재 건 수와 비교하면 테스트 해보시면 됩니다.

 

+ 비율 만큼 랜덤 업데이트

UPDATE public.metrics
SET value = random()
WHERE ts IN (
  SELECT ts
  FROM public.metrics
  TABLESAMPLE SYSTEM (20) -- 20%
);

오늘은 여기까지~

 

728x90