PostgreSQL
PostgreSQL: FDW를 활용한 동기화 Batch
dewstream
2025. 7. 17. 08:00
728x90
※ PostgreSQL: Synchronization Batch Using FDW.
안녕하세요. 듀스트림입니다.
오늘 포스팅은 Postgres_fdw를 활용하여 데이터를 원천(Remote)에서 로컬로 동기화하는 방법에 대한 내용입니다.
이 구성은 주로 스테이징 DB를 따로 사용하거나, 프론트 영역과 OLAP 영역이 나눠져 있을 경우 나오는 아키텍처입니다.

1. postgres_fdw 개념과 핵심 기능
| 범주 | 설명 |
| 원격 테이블 매핑 | CREATE SERVER → CREATE USER MAPPING → CREATE FOREIGN TABLE만으로 외부 PostgreSQL 테이블을 지역 DB 객체처럼 다룸. |
| 푸시다운 최적화 | WHERE‧JOIN‧AGGREGATE를 가능한 한 원격 서버로 밀어 넣어 네트워크 전송량을 최소화. (필요 시 use_remote_estimate 옵션으로 원격 EXPLAIN 통계 사용) |
| 배치 INSERT | batch_size(기본 1) 옵션으로 한 번에 여러 행을 전송하여 왕복 RTT를 줄임. |
| 페치 사이즈 | fetch_size 옵션으로 커서‑기반 가져오기 배치를 조절해 메모리·네트워크 균형을 맞춤. |
| 비동기 스캔 | async_capable=true일 때 Append 하위 계획을 병렬로 실행해 복수 테이블 조회 시 속도를 높임. |
| 병렬 커밋/어보트 | parallel_commit/parallel_abort가 여러 원격 트랜잭션을 동시에 종료해 대기 시간을 단축. |
• fdw 라이브러리는 로컬 서버에서만 로드됩니다.
• Remote(소스) 쪽은 단순 TCP 접속 대상으로, FDW 확장 자체를 설치할 필요가 없습니다.
• FDW 코드가 동작하는 곳은 로컬 서버 프로세스입니다. Remote(소스) 서버는 FDW와 관계없이 그냥 클라이언트(로컬 서버)가 보내는 쿼리만 처리합니다.
+ fdw 옵션 정리
| 옵션 | 적용 위치 | 타입 | 기본값 | 설명 |
| schema_name | foreign table | string | foreign schema | 원격 객체의 스키마 명 지정 |
| table_name | foreign table | string | foreign table | 원격 테이블 명 지정 |
| column_name | foreign table column | string | column name | 원격 컬럼명 매핑 |
| use_remote_estimate | server / table | boolean | false | 원격 EXPLAIN 사용 여부 |
| fdw_startup_cost | server | float | 100 | 원격 접속 및 초기 오버헤드 |
| fdw_tuple_cost | server | float | 0.2 | 레코드 전송 비용 |
| analyze_sampling | server / table | string | auto | ANALYZE 수행 방식 |
| extensions | server | string | — | 원격 실행 허용 확장 목록 |
| fetch_size | server / table | integer | 100 | 한 번에 가져올 행 수 |
| batch_size | server / table | integer | 1 | DML 일괄 처리 시 한번에 전송 행 수 |
| async_capable | server / table | boolean | false | 비동기 실행 가능 여부 |
| parallel_commit | server | boolean | false | 병렬 커밋 여부 |
| parallel_abort | server | boolean | false | 병렬 롤백 여부 |
| updatable | server / table | boolean | true | DML 가능 여부 |
| truncatable | server / table | boolean | true | TRUNCATE 가능 여부 |
| keep_connections | server | boolean | on | 세션 재사용 연결 유지 여부 |
| import_collate | IMPORT FOREIGN SCHEMA | boolean | true | COLLATE 포함 여부 |
| import_default | IMPORT FOREIGN SCHEMA | boolean | false | DEFAULT 포함 여부 |
| import_generated | IMPORT FOREIGN SCHEMA | boolean | true | GENERATED 컬럼 포함 여부 |
| import_not_null | IMPORT FOREIGN SCHEMA | boolean | true | NOT NULL 포함 여부 |
++ use_remote_estimate
- use_remote_estimate = true
- 원격 서버에 EXPLAIN 명령을 보내 계획 비용 및 행 수를 수집.
- 이후 fdw_startup_cost + fdw_tuple_cost를 해당 값에 추가.
- 장점: 정확도 높음, 원격 조인(pushdown) 활용 가능.
- 단점: EXPLAIN 요청 비용, 네트워크 지연 있음.
- use_remote_estimate = false
- 로컬에서 통계 기반 추정 수행(분석된 통계 테이블 사용).
- 추가로 fdw_startup_cost + fdw_tuple_cost 적용.
- 비용: 반복 적용된 fdw_tuple_cost로 인해 과다 평가 가능.
- 단점: 원격 테이블 통계 없거나 오래되면 부정확.
# 플로우 차트
클라이언트 쿼리 → 계획 수립 단계
├─ 서버/표 레벨에 use_remote_estimate 설정?
│ ├─ true
│ │ └→ 원격 서버에 EXPLAIN 실행 → 비용/행 수 받아옴
│ └─ false
│ └→ 로컬 추정 (statistics 또는 default)
│
→ fdw_startup_cost + fdw_tuple_cost 더함
→ 계획 완료
+++ IMPORT FOREIGN SCHEMA
원격 서버의 특정 스키마 또는 테이블 집합을 로컬의 foreign table로 자동 생성하는 명령입니다.
-- 사용 예시 --
-- 로컬에 schema 생성
CREATE SCHEMA IF NOT EXISTS remote_data;
-- 원격 스키마 전체 가져오기
IMPORT FOREIGN SCHEMA public
FROM SERVER myserver
INTO remote_data
OPTIONS (
import_collate 'true',
import_default 'false',
import_not_null 'true'
);
-- 특정 테이블만 가져오기
IMPORT FOREIGN SCHEMA public
LIMIT TO (employees, departments)
FROM SERVER myserver
INTO remote_data;
-- 특정 테이블 제외하고 가져오기
IMPORT FOREIGN SCHEMA public
EXCEPT (temp_table)
FROM SERVER myserver
INTO remote_data;
2. 설치 방법
PostgreSQL 소스 파일의 경우 저는 보통 wget으로 받아서 사용합니다.
posgres_fdw는 로컬 서버에만 설치합니다.
cd ~
wget https://ftp.postgresql.org/pub/source/v17.5/postgresql-17.5.tar.gz
▸ postgres_fdw 설치
cd /home/postgres/postgresql-17.5/contrib/postgres_fdw
make USE_PGXS=1
make install
▸ 익스텐션 추가
CREATE EXTENSION IF NOT EXISTS postgres_fdw;

3. 시나리오
▸ 환경
| 구분 | 버전 | DB명 | 설명 |
| 소스 | Rocky 8.10 / PostgreSQL 10.23 | src | 데이터 계속 INSERT (or UPDATE) |
| 타겟 | Rocky 8.10 / PostgreSQL 17.5 | tgt | 1 시간마다 증분 수집·집계 |
▸ Remote DB 설정
-- fdw 유저 생성
CREATE ROLE fdw_user LOGIN PASSWORD '1234';
-- 테스트용 데이터베이스 생성
CREATE DATABASE test OWNER fdw_user;
-- 커넥션 변경
\c test fdw_user
-- 테스트 테이블 생성
CREATE TABLE public.metrics (
ts TIMESTAMPTZ PRIMARY KEY,
value NUMERIC
);
-- 권한 설정 (필요시)
GRANT SELECT,INSERT ON public.metrics TO fdw_user;
-- 데이터 삽입
INSERT INTO public.metrics (ts, value)
SELECT date_trunc('minute', now())
+ (g * interval '1 microsecond') ,
random()
FROM generate_series(0, 999999) AS g;
-- 플래그 컬럼 추가
ALTER TABLE public.metrics
ADD COLUMN created_at timestamptz NOT NULL DEFAULT now(),
ADD COLUMN updated_at timestamptz NOT NULL DEFAULT now();
-- 업데이트 트리거 함수 정의
CREATE OR REPLACE FUNCTION trg_set_updated_at()
RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
NEW.updated_at := now(); -- 업데이트마다 현재 시각으로 갱신
RETURN NEW;
END;
$$;
-- 트리거 바인딩(PG 10~11)
CREATE TRIGGER set_updated_at
BEFORE UPDATE ON public.metrics
FOR EACH ROW
EXECUTE PROCEDURE trg_set_updated_at();
-- + 트리거 바인딩(PG 12+)
CREATE TRIGGER set_updated_at
BEFORE UPDATE ON public.metrics
FOR EACH ROW
EXECUTE FUNCTION trg_set_updated_at();
▸ Local DB 설정
-- fdw 유저 생성
CREATE ROLE fdw_user LOGIN PASSWORD '1234';
-- 테스트용 데이터베이스 생성
CREATE DATABASE tgt OWNER fdw_user;
ALTER ROLE fdw_user SUPERUSER; --fdw server 생성을 위해
-- 커넥션 변경
\c tgt fdw_user
/* fdw 설정 */
-- fdw 익스텐션 추가
CREATE EXTENSION IF NOT EXISTS postgres_fdw;
-- fdw 서버 생성
CREATE SERVER src_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host '10.12.20.105', port '5432', dbname 'test');
-- fdw 유저 매핑
CREATE USER MAPPING FOR fdw_user
SERVER src_server
OPTIONS (user 'fdw_user', password '1234');
-- 원격 참조용 FOREIGN TABLE 생성
CREATE FOREIGN TABLE public.metrics_src (
ts timestamptz,
value numeric,
updated_at timestamptz
) SERVER src_server
OPTIONS (schema_name 'public', table_name 'metrics',
fetch_size '10000');
-- 동기화용 로컬 테이블 생성
CREATE TABLE public.metrics_local (
ts timestamptz PRIMARY KEY,
value numeric
);
-- 동기화 메타 정보 테이블 생성
CREATE TABLE public.sync_meta (
src_server text PRIMARY KEY,
last_ts timestamptz
);
-- 초기 데이터 삽입
INSERT INTO public.sync_meta VALUES ('test', 'epoch');
-- 초기 적재
MERGE INTO public.metrics_local AS tgt
USING public.metrics_src AS src
ON (tgt.ts = src.ts)
WHEN MATCHED THEN
UPDATE SET value = src.value
WHEN NOT MATCHED THEN
INSERT (ts, value)
VALUES (src.ts, src.value);
#!/bin/bash
##############################################################################
# 동기화 배치: 소스(${SRC_TABLE}) → 타겟(${LOCAL_TABLE})
##############################################################################
# 0) 사용자 정의 변수 ---------------------------------------------------------
PG_BIN="/postgres/bin"
DB_HOST="127.0.0.1"
DB_PORT="5432"
DB_NAME="tgt"
DB_USER="fdw_user"
DB_PASS="1234"
SRC_SERVER="test"
SRC_TABLE="public.metrics_src"
LOCAL_TABLE="public.metrics_local"
SYNC_META_TABLE="public.sync_meta"
LOG_PATH="/data/postgres_data/log/sync_metrics.log"
# 1) 공통 환경 변수 ----------------------------------------------------------
export PATH="$PG_BIN:/usr/sbin:/usr/bin:/sbin:/bin:$PATH"
export PGOPTIONS="-c timezone=UTC"
export PGPASSWORD="$DB_PASS"
PSQL_OPTS="-v ON_ERROR_STOP=1 -q -A -t -F',' -h $DB_HOST -p $DB_PORT -d $DB_NAME -U $DB_USER"
# 2) 로그 --------------------------------------------------------------------
mkdir -p "$(dirname "$LOG_PATH")"
exec >>"$LOG_PATH" 2>&1
log(){ echo "[$(date '+%F %T')] $*"; }
log "batch_started"
# 3) 동기화 SQL --------------------------------------------------------------
read -r -d '' SQL_STMT <<EOSQL
WITH last AS (
SELECT COALESCE(last_ts, 'epoch'::timestamptz) AS from_ts
FROM $SYNC_META_TABLE
WHERE src_server = '$SRC_SERVER'
),
delta AS (
SELECT ts, value, updated_at
FROM $SRC_TABLE, last
WHERE updated_at > from_ts
),
merged AS (
MERGE INTO $LOCAL_TABLE AS tgt
USING delta AS src
ON (tgt.ts = src.ts)
WHEN MATCHED THEN
UPDATE SET value = src.value
WHEN NOT MATCHED THEN
INSERT (ts, value) VALUES (src.ts, src.value)
RETURNING merge_action() AS action -- <─ ★
),
upd_meta AS ( -- 메타 테이블 갱신
UPDATE $SYNC_META_TABLE
SET last_ts = (SELECT COALESCE(MAX(updated_at), last_ts) FROM $SRC_TABLE WHERE updated_at > (SELECT from_ts FROM last))
WHERE src_server = '$SRC_SERVER'
)
SELECT
COUNT(*) FILTER (WHERE action = 'INSERT') AS ins_cnt,
COUNT(*) FILTER (WHERE action = 'UPDATE') AS upd_cnt
FROM merged;
EOSQL
# 4) psql 실행 후 결과 캡처 ---------------------------------------------------
COUNTS=$(runuser -l postgres -c "psql $PSQL_OPTS -c \"$SQL_STMT\"")
RET=$? # runuser 내부의 psql 종료코드
if [ $RET -eq 0 ]; then
IFS=',' read -r INS_CNT UPD_CNT <<< "$COUNTS"
log "sync_completed_successfully inserted=$INS_CNT updated=$UPD_CNT"
else
log "sync_failed_check_above (psql_exit=$RET)"
exit 1
fi
log "batch_finished"
온전한 단일 트랜잭션 처리를 하시려면 SQL에 BEGIN; COMMIT;를 추가하시거나, psql -1 옵션을 추가하시면 됩니다.
단, -1 옵션을 사용하실 경우 SQL에 BEGIN; COMMIT;가 있으면 충돌이 발생합니다.
chmod +x sync_metrics.sh
# 크론 설정
0 * * * * /bin/bash /postgres/etc/sync_metrics.sh
이후에 초기 적재 건 수와 비교하면 테스트 해보시면 됩니다.
+ 비율 만큼 랜덤 업데이트
UPDATE public.metrics
SET value = random()
WHERE ts IN (
SELECT ts
FROM public.metrics
TABLESAMPLE SYSTEM (20) -- 20%
);
오늘은 여기까지~
728x90