ElasticSearch & Logstash & Kibana 개념 정리에서 설명한 DataFlow 중 Input을 통해 우리 서비스의 DB에 접근하여 데이터를 가져올 수 있다.

 input {DB에 접근하여 데이터 수집}
 
 filter {}
 
 output {}

JDBC Input Plugin

JDBC Input Plugin은 Logstash의 내장 Input Plugin 중 하나다. JDBC Interface을 지원하는 관계형 데이터베이스는 이 Input Plugin을 통해 쿼리를 사용하여 데이터를 수집하고 처리할 수 있게 된다.

Data Flow

Untitled

input {
  jdbc {
    jdbc_driver_library => "/usr/share/logstash/logstash-core/lib/jars/postgresql-42.2.27.jre7.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "${DB_URL}"
    jdbc_user => "${DB_USER}"
    jdbc_password => "${DB_PASSWORD}"
    jdbc_validate_connection => true
    use_column_value => true 
    tracking_column => "updatedat"    
    tracking_column_type => "timestamp"
    clean_run => false
    jdbc_paging_enabled => true
    jdbc_page_size => 10000
    jdbc_pool_timeout => 300
    schedule => "* * * * *"
    statement => '
      SELECT
        lecture."id",
        lecture."title",
        Lecture."stars",
        Lecture."price",
        Lecture."reviewCount",
        Lecture."isGroup",
        Lecture."difficultyLevel",
        Lecture."startDate",
        Lecture."endDate",
        Lecture."updatedAt",
        Lecture."isActive",

        (SELECT  lectureMethod.name
          FROM "LectureMethod" lectureMethod
          WHERE lecture."lectureMethodId" = lectureMethod.id) AS lectureMethod,

        (SELECT json_agg(lectureImage."imageUrl")::text
          FROM "LectureImage" lectureImage
          WHERE lecture."id" = lectureImage."lectureId") AS images_string,

        (SELECT json_build_object( 
          \\'lecturerId\\', lecturer.id,
          \\'nickname\\', lecturer."nickname",
          \\'profileCardImageUrl\\', lecturer."profileCardImageUrl" )::text
          FROM "Lecturer" lecturer
          WHERE lecture."lecturerId" = lecturer.id) AS lecturer_string,
          
        (SELECT json_agg(json_build_object( 
            \\'regionId\\', region.id,
            \\'administrativeDistrict\\', region."administrativeDistrict", 
            \\'district\\', region."district"
          ))::text
          FROM "LectureToRegion" lectureToRegion
          JOIN "Region" region ON lectureToRegion."regionId" = region.id
          WHERE lecture.id = lectureToRegion."lectureId") AS regions_string,

        (SELECT json_agg(json_build_object( 
          \\'categoryId\\', lectureToDanceGenre."danceCategoryId",
          \\'genre\\', 
          CASE
          WHEN lectureToDanceGenre."name" IS NULL THEN danceCategory."genre"
          ELSE lectureToDanceGenre."name"
        END))::text
        FROM "LectureToDanceGenre" lectureToDanceGenre 
        JOIN "DanceCategory" danceCategory 
        ON lectureToDanceGenre."danceCategoryId" = danceCategory.id
        WHERE lecture.id = lectureToDanceGenre."lectureId") AS genres_string

      FROM
        "Lecture" lecture

      WHERE
          lecture."updatedAt" > :sql_last_value
      '
    last_run_metadata_path =>"/usr/share/logstash/lecture-inspector-index.dat"
 }
}

Option 설명

schedule: cron과 같이 주기를 설정 할 수 있다. "* * * * *"으로 설정하여 1분마다 쿼리를 실행시킨다.

statement: sql을 직접 작성하거나 파일 경로를 지정하여 실행시킬 수 있다.

tracking_column: 말 그대로 트래킹 할 컬럼을 지정해준다.

last_run_metadata_path: 트래킹 한 컬럼의 데이터를 저장하는 경로이며 아래 쿼리의 sql_last_value에 할당되는 데이터를 저장한다.

WHERE
          lecture."updatedAt" > :sql_last_value

데이터 수집 흐름

Untitled

  1. 1분마다 쿼리에 맞는 강의 데이터를 수집하고, lecture데이터 중 updateAt을 트래킹한다. 만약 첫 실행이라면 이미지와 같이 sql_last_value를 1970년도로 쿼리를 날린다.
  2. 수집한 강의 목록 중 마지막 updateAt이 2024-03-27 09:00:00라면 last_run_metadata_path안에 있는 data의 값은 2024-03-27 09:00:00 가 저장된다.
  3. logstash는 lecture."updatedAt" > :sql_last_value에2024-03-27 09:00:00를 할당하여 쿼리를 1분마다 보내게 된다.