flink通过jdbc读取postgresql数据库里的数据

问题1: 从postgresql里使用flink-jdbc读取数据的问题, 数据类型不匹配,不支持jsonb等其他类型.

        使用create table 语句来读取,由于postgresql里设置的是jsonb的类型, 他又不让你输入sql语句(语句里需要写CAST函数),所以就抛错了
Caused by: java.lang.ClassCastException: org.postgresql.util.PGobject cannot be cast to java.lang.String
Caused by: java.lang.ClassCastException: org.postgresql.util.PGobject cannot be cast to java.lang.String
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:635)
    ... 9 more
问题2: partition必须是数字类型的字段
 另外,因为connector.read.partition.column他使用的是between语句,所以只能是数字,时间戳等,否则也会报错
. partition.column must be a numeric,
 -- date, or timestamp column from the table in question.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
    at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:627)
    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)
    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)
    at com.hellobike.search.flink.task.FullIndexBuilderTask.main(FullIndexBuilderTask.java:92)
Caused by: java.lang.Exception: java.lang.IllegalArgumentException: open() failed.ERROR: operator does not exist: character varying >= bigint
  建议:No operator matches the given name and argument type(s). You might need to add explicit type casts.
  位置:514
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: open() failed.ERROR: operator does not exist: character varying >= bigint
  建议:No operator matches the given name and argument type(s). You might need to add explicit type casts.
  位置:514
    at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:250)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:85)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: org.postgresql.util.PSQLException: ERROR: operator does not exist: character varying >= bigint
  建议:No operator matches the given name and argument type(s). You might need to add explicit type casts.
  位置:514
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2103)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:1836)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:257)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.execute(AbstractJdbc2Statement.java:512)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.executeWithFlags(AbstractJdbc2Statement.java:388)
    at org.postgresql.jdbc2.AbstractJdbc2Statement.executeQuery(AbstractJdbc2Statement.java:273)
    at org.apache.flink.api.java.io.jdbc.JDBCInputFormat.open(JDBCInputFormat.java:247)
    ... 4 more

发表回复

您的电子邮箱地址不会被公开。 必填项已用 * 标注