- 最后登录
- 2018-11-14
- 在线时间
- 1 小时
- 威望
- 0
- 金钱
- 8
- 注册时间
- 2018-9-14
- 阅读权限
- 10
- 帖子
- 2
- 精华
- 0
- 积分
- 0
- UID
- 2520
|
1#
发表于 2018-9-14 13:55:27
|
查看: 3080 |
回复: 0
oggforbigdata版本为:OGGCORE_OGGADP.12.3.0.1.2 for linux x64
用以下kafka.props配置:
[oggmes@server6 ogg]$ cat dirprm/kafka.props
gg.handlerlist = kafkahandler
gg.handler.kafkahandler.type = kafka
gg.handler.kafkahandler.KafkaProducerConfigFile = custom_kafka_producer.properties
#gg.handler.kafkahandler.TopicName = tpmes
gg.handler.kafkahandler.BlockingSend = false
gg.handler.kafkahandler.includeTokens = true
gg.handler.kafkahandler.topicMappingTemplate=${tableName}
gg.handler.kafkahandler.keyMappingTemplate=${primaryKeys}
gg.handler.name.format.pkUpdateHandlingformat.pkUpdateHandling = delete-insert
gg.handler.kafkahandler.SchemaTopicName=tsmes
gg.handler.kafkahandler.Mode = op
gg.handler.kafkah andler.topicPartitioning = table
#json
gg.handler.kafkahandler.format = json
gg.handler.kafkahandler.format.insertOpKey = I
gg.handler.kafkahandler.format.updateOpKey = U
gg.handler.kafkahandler.format.deleteOpKey = D
gg.handler.kafkahandler.format.prettyPrint = true
gg.handler.kafkahandler.format.jsonDelimiter = CDATA[]
#gg.handler.kafkahandler.format.generateSchema = true
#gg.handler.kafkahandler.format.schemaDirectory = dirdef
#gg.handler.kafkahandler.format.treatAllColumnsAsString = true
gg.handler.kafkahandler.format.includePrimaryKeys = true
#gg.handler.kafkahandler.maxGroupSize =100, 1Mb
#gg.handler.kafkahandler.minGroupSize =50, 500Kb
goldengate.userexit.timestamp=utc
goldengate.userexit.writers=javawriter
javawriter.stats.display=TRUE
#javawriter.stats.full=TRUE
gg.log=log4j
gg.log.level=INFO
gg.report.time=30sec
gg.classpath=dirprm/:/home/oggmes/kafka/libs/*
javawriter.bootoptions=-Xmx512m -Xms32m -Djava.class.path=ggjava/ggjava.jar
得到的json 主题输出为dirdef/CMES.ERP_SALE_ORDER.schema.json 的内容
[oggmes@server6 ogg]$ cat dirdef/CMES.ERP_SALE_ORDER.schema.json
{
"$schema":"http://json-schema.org/draft-04/schema#",
"title":"CMES.ERP_SALE_ORDER",
"description":"JSON schema for table CMES.ERP_SALE_ORDER",
"definitions":{
"row":{
"type":"object",
"properties":{
"ID":{
"type":[
"string",
"null"
]
},
"USER_CREATED":{
"type":[
"string",
"null"
]
},
"DATETIME_CREATED":{
"type":[
"string",
"null"
]
},
"USER_MODIFIED":{
"type":[
"string",
"null"
]
},
"DATETIME_MODIFIED":{
"type":[
"string",
"null"
]
},
"STATE":{
"type":[
"string",
"null"
]
},
"ENTERPRISE_ID":{
"type":[
"string",
"null"
]
},
"ORG_ID":{
"type":[
"string",
"null"
]
},
"BILL_NO":{
"type":[
"string",
"null"
]
},
"CUSTOMER_CODE":{
"type":[
"string",
"null"
]
},
"DELIVER_ADDRESS":{
"type":[
"string",
"null"
]
},
"IS_MANUAL":{
"type":[
"string",
"null"
]
},
"REMARK":{
"type":[
"string",
"null"
]
},
"ERP_ID":{
"type":[
"string",
"null"
]
},
"STATUS":{
"type":[
"string",
"null"
]
}
},
"additionalProperties":false
},
"tokens":{
"type":"object",
"description":"Token keys and values are free form key value pairs.",
"properties":{
},
"additionalProperties":true
}
},
"type":"object",
"properties":{
"table":{
"description":"The fully qualified table name",
"type":"string"
},
"op_type":{
"description":"The operation type",
"type":"string"
},
"op_ts":{
"description":"The operation timestamp",
"type":"string"
},
"current_ts":{
"description":"The current processing timestamp",
"type":"string"
},
"pos":{
"description":"The position of the operation in the data source",
"type":"string"
},
"primary_keys":{
"description":"Array of the primary key column names.",
"type":"array",
"items":{
"type":"string"
},
"minItems":0,
"uniqueItems":true
},
"tokens":{
"$ref":"#/definitions/tokens"
},
"before":{
"$ref":"#/definitions/row"
},
"after":{
"$ref":"#/definitions/row"
}
},
"required":[
"table",
"op_type",
"op_ts",
"current_ts",
"pos"
],
"additionalProperties":false
并不是正常的json消息输出:
offset = 287, value =
{
"table":"CMES.ERP_SALE_ORDER",
"op_type":"U",
"op_ts":"2018-09-14 00:37:48.699888",
"current_ts":"2018-09-14T08:37:55.273000",
"pos":"00000000020000007525",
"tokens":{
},
"before":{
"ID":"3",
"USER_CREATED":"DD",
"DATETIME_CREATED":"2018-09-06:17:13:01",
"USER_MODIFIED":null,
"DATETIME_MODIFIED":null,
"STATE":"C",
"ENTERPRISE_ID":"###",
"ORG_ID":"2",
"BILL_NO":"2",
"CUSTOMER_CODE":"2",
"DELIVER_ADDRESS":null,
"IS_MANUAL":"2",
"REMARK":null,
"ERP_ID":null,
"STATUS":"ooo"
},
"after":{
"ID":"3",
"USER_CREATED":"WW",
"DATETIME_CREATED":"2018-09-06:17:13:01",
"USER_MODIFIED":null,
"DATETIME_MODIFIED":null,
"STATE":"C",
"ENTERPRISE_ID":"###",
"ORG_ID":"2",
"BILL_NO":"2",
"CUSTOMER_CODE":"2",
"DELIVER_ADDRESS":null,
"IS_MANUAL":"2",
"REMARK":null,
"ERP_ID":null,
"STATUS":"ooo"
}
}
请问,这是什么原因??? |
|