ElasticSearch & Logstash & Kibana 개념 정리에서 설명한 DataFlow 중 Input을 통해 우리 서비스의 DB에 접근하여 데이터를 가져올 수 있다.
input {DB에 접근하여 데이터 수집}
filter {}
output {}
JDBC Input Plugin
JDBC Input Plugin은 Logstash의 내장 Input Plugin 중 하나다. JDBC Interface을 지원하는 관계형 데이터베이스는 이 Input Plugin을 통해 쿼리를 사용하여 데이터를 수집하고 처리할 수 있게 된다.
Data Flow
input {
jdbc {
jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/postgresql-42.2.27.jre7.jar"
jdbc_driver_class => "org.postgresql.Driver"
jdbc_connection_string => "${DB_URL}"
jdbc_user => "${DB_USER}"
jdbc_password => "${DB_PASSWORD}"
jdbc_validate_connection => true
use_column_value => true
tracking_column => "updatedat"
tracking_column_type => "timestamp"
clean_run => false
jdbc_paging_enabled => true
jdbc_page_size => 10000
jdbc_pool_timeout => 300
schedule => "* * * * *"
statement => '
SELECT
lecture."id",
lecture."title",
Lecture."stars",
Lecture."price",
Lecture."reviewCount",
Lecture."isGroup",
Lecture."difficultyLevel",
Lecture."startDate",
Lecture."endDate",
Lecture."updatedAt",
Lecture."isActive",
(SELECT lectureMethod.name
FROM "LectureMethod" lectureMethod
WHERE lecture."lectureMethodId" = lectureMethod.id) AS lectureMethod,
(SELECT json_agg(lectureImage."imageUrl")::text
FROM "LectureImage" lectureImage
WHERE lecture."id" = lectureImage."lectureId") AS images_string,
(SELECT json_build_object(
\\'lecturerId\\', lecturer.id,
\\'nickname\\', lecturer."nickname",
\\'profileCardImageUrl\\', lecturer."profileCardImageUrl" )::text
FROM "Lecturer" lecturer
WHERE lecture."lecturerId" = lecturer.id) AS lecturer_string,
(SELECT json_agg(json_build_object(
\\'regionId\\', region.id,
\\'administrativeDistrict\\', region."administrativeDistrict",
\\'district\\', region."district"
))::text
FROM "LectureToRegion" lectureToRegion
JOIN "Region" region ON lectureToRegion."regionId" = region.id
WHERE lecture.id = lectureToRegion."lectureId") AS regions_string,
(SELECT json_agg(json_build_object(
\\'categoryId\\', lectureToDanceGenre."danceCategoryId",
\\'genre\\',
CASE
WHEN lectureToDanceGenre."name" IS NULL THEN danceCategory."genre"
ELSE lectureToDanceGenre."name"
END))::text
FROM "LectureToDanceGenre" lectureToDanceGenre
JOIN "DanceCategory" danceCategory
ON lectureToDanceGenre."danceCategoryId" = danceCategory.id
WHERE lecture.id = lectureToDanceGenre."lectureId") AS genres_string
FROM
"Lecture" lecture
WHERE
lecture."updatedAt" > :sql_last_value
'
last_run_metadata_path =>"/usr/share/logstash/lecture-inspector-index.dat"
}
}
Option 설명
schedule: cron과 같이 주기를 설정 할 수 있다. "* * * * *"으로 설정하여 1분마다 쿼리를 실행시킨다.
statement: sql을 직접 작성하거나 파일 경로를 지정하여 실행시킬 수 있다.
tracking_column: 말 그대로 트래킹 할 컬럼을 지정해준다.
last_run_metadata_path: 트래킹 한 컬럼의 데이터를 저장하는 경로이며 아래 쿼리의 sql_last_value에 할당되는 데이터를 저장한다.
WHERE
lecture."updatedAt" > :sql_last_value
데이터 수집 흐름