SmartCar_Agent.sources = SmartCarInfo_SpoolSource DriverCarInfo_TailSource
SmartCar_Agent.channels = SmartCarInfo_Channel DriverCarInfo_Channel
SmartCar_Agent.sinks = SmartCarInfo_LoggerSink DriverCarInfo_KafkaSink
-> 플럼의 에이전트에서 사용할 Source, Channel, Sink의 각 리소스 변수를 정의한 것이다.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.type = spooldir
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.spoolDir = /home/pilot-pjt/working/car-batch-log
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.deletePolicy = immediate
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.batchSize = 1000
-> 에이전트의 Source를 설정한다. ‘spooldir’은 지정한 특정 디렉터리를 모니터링하고 있다가 새로운 파일이 생성되면 이벤트를 감지해서 “batchSize”의 설정값만큼 읽어서 Channel에 데이터를 전송한다.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors = filterInterceptor
-> 수집 데이터를 필터링하기 위해 filterInterceptor 변수 선언 후 SmartCarInfo_SpoolSource에 할당했다.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.type = regex_filter
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.regex = ^\\d{14}
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.interceptors.filterInterceptor.excludeEvents = false
->filterInterceptor의 type을 ‘regex_filter’로 설정하였는데 regex는 ‘Regular expression’ 즉 정규 표현식의 줄임말로써 정규 표현식을 이용해 수집 데이터를 필터링할 때 유용하게 사용이 가능하다.
SmartCar_Agent.channels.SmartCarInfo_Channel.type = memory
SmartCar_Agent.channels.SmartCarInfo_Channel.capacity = 100000
SmartCar_Agent.channels.SmartCarInfo_Channel.transactionCapacity = 10000
-> 에이전트의 Channel로서 SmartCarInfo_Channel의 Type을 ‘memory’로 설정했다.
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.type = logger
-> 에이전트의 최종 목적지다. SmartCarInfor_LoggerSink의 값을 ‘logger’로 설정하였다. Logger Sink는 수집한 데이터를 테스트 및 디버깅 목적으로 플럼의 표준 출력 로그파일에 출력한다.
SmartCar_Agent.sources.SmartCarInfo_SpoolSource.channels = SmartCarInfo_Channel
SmartCar_Agent.sinks.SmartCarInfo_LoggerSink.channel = SmartCarInfo_Channel
-> Source와 Channel Sink를 연결한다. 앞서 정의한 SmartCarInfo_SpoolSouce의 채널값을 SmartCarInfo_Channel로 설정하고, SmartCarInfo_LoggerSink의 채널 값도 SmartCarInfo_Channel로 설정해서 File -> Channel -> Sink로 이어지는 에이전트 리소스를 하나로 연결해준다.
SmartCar_Agent.sources.DriverCarInfo_TailSource.type = exec
SmartCar_Agent.sources.DriverCarInfo_TailSource.command = tail -F /home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfo.log
SmartCar_Agent.sources.DriverCarInfo_TailSource.restart = true
SmartCar_Agent.sources.DriverCarInfo_TailSource.batchSize = 1000
-> Source의 type은 ‘exec’인데 ‘exec’는 플럼 외부에서 수행한 명령의 결과를 플럼의 Event로 가져와 수집할 수 있는 기능을 제공한다. 스마트카 운전자의 운정정보가 로그 시뮬레이터를 통해 ‘/home/pilot-pjt/working/driver-realtime-log/SmartCarDriverInfor.log’에 생성된다. 리눅스의 ‘tall’명령은 플럼의 ‘exec’를 실행하게 해서 운전자의 실시간 운행 정보를 수집한다.
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors = filterInterceptor2
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.type = regex_filter
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.regex = ^\\d{14}
SmartCar_Agent.sources.DriverCarInfo_TailSource.interceptors.filterInterceptor2.excludeEvents = false
-> Interceptor를 정의하였는데 여기서도 데이터를 필터링하기 위한 ‘regex_filter’만 추가했다.
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.topic = SmartCar-Topic
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.brokerList = server02.hadoop.com:9092
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.requiredAcks = 1
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.batchSize = 1000
-> 스마트카 운전자의 실시간 운행 정보는 플럼에서 수집과 동시에 카프카로 전송한다.
-> 카프카 브로커 서버가 실행 중인 server02.hadoop.com:9092에 연결해서 SmartCat-Topic에 데이터를 100개의 배치 크기로 전송한다.
SmartCar_Agent.channels.DriverCarInfo_Channel.type = memory
SmartCar_Agent.channels.DriverCarInfo_Channel.capacity= 100000
SmartCar_Agent.channels.DriverCarInfo_Channel.transactionCapacity = 10000
-> DriverCarInfo의 Channel을 Memory Channel로 선언했다.
SmartCar_Agent.sources.DriverCarInfo_TailSource.channels = DriverCarInfo_Channel
SmartCar_Agent.sinks.DriverCarInfo_KafkaSink.channel = DriverCarInfo_Channel
-> DriverCarInfo의 Source와 Sink의 Channel을 앞서 정의한 DriverCarInfo_Channel로 설정해서 Source-Channel-Sink의 구조를 완성했다.
그림 3.34 수집 기능 점검 1 -플럼의 표준 출력 로그 파일로 수집된 데이터 확인
그림 3.35 수집 기능 점검 2 - 카프카 Consumer로 실시간 수집 데이터 확인