Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mysql-CDC -> Doris Sink [BUG] #7050

Open
3 tasks done
BigdataSurvey opened this issue Jun 24, 2024 · 0 comments
Open
3 tasks done

Mysql-CDC -> Doris Sink [BUG] #7050

BigdataSurvey opened this issue Jun 24, 2024 · 0 comments
Labels

Comments

@BigdataSurvey
Copy link

BigdataSurvey commented Jun 24, 2024

Search before asking

  • I had searched in the issues and found no similar issues.

What happened

从Mysql 抽取数据到Doris。会有两种情况:

情况1:刚开始前两分钟正常,后面日志报错: Label already error。可是我每次启动的时候都会去修改"*.conf.template" 配置文件中"sink.label-prefix" 项来保证每次启动唯一,但是运行几分钟之后还是报错。经过进一步测试, 发现在配置文件中"Mysql Source"中配置了多表, 导致再多个表数据同一时间修改后通过Doris传输报错 Label already error,只配置一张表是没有问题的,但是业务场景需要将很多分表写到一个doris表中。请问这个问题有解决方案吗?

情况2:任务在执行过程中被中断 (InterruptedException) 线程在等待某些资源或条件时被其他线程中断,但是服务器配置和网络通信都没有压力目前。

服务配置:Seatunnel集群共4台服务器,资源环境良好, 版本是2.3.5 版本。Doris版本为2.1.2


Extract data from Mysql to Doris. There are two kinds of errors:

Situation 1: The log is normal for the first two minutes. Later, a Label already error is reported. However, I would modify the "sink.label-prefix" item in the "*.conf.template" configuration file every time I started to ensure that the startup was unique, but an error was still reported after running for several minutes. After further testing, it was found that multiple tables were configured in the "Mysql Source" configuration file. After the data of multiple tables is modified at the same time, a Label already error is reported through the Doris transmission. It is OK to configure only one table, but many sub-tables need to be written to one doris table in a service scenario. Is there a solution to this problem?

Situation 2: A task is interrupted during execution (InterruptedException) A thread is interrupted by another thread while waiting for some resource or condition, but neither server configuration nor network communication is currently under stress.

Service configuration: There are four servers in the Seatunnel cluster, the resource environment is good, and the version is 2.3.5. The Doris version is 2.1.2

SeaTunnel Version

SeaTunnel Version: 2.3.5
Doris Version: 2.1.2

SeaTunnel Config

【./config/v2.gxy.streaming.conf.template】
env {
  parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 10000
}

source {
  MySQL-CDC {
    base-url = "jdbc:mysql://localhost:3306/practice?useSSL=false"
    username = "123"
    password = "123"
    table-names = [
      "practice.gxy_student_0",
      "practice.gxy_student_1",
      "practice.gxy_student_2",
      "practice.gxy_student_3",
      "practice.gxy_student_4"
    ]
    startup.mode = "latest"
  }
}

sink {
  Doris {
    fenodes = "localhost:8030"
    username = "123"
    password = "123"
    database = "tis_gxy"
    table = "gxy_student"
    table.identifier = "tis_gxy.gxy_student"
    sink.label-prefix = "label_92e1051f8765498e_0624_1016"
    sink.enable-2pc = "true"
    sink.enable-delete = "true"
    doris.config {
      format = "json"
      read_json_by_line = "true"
    }
  }
}

【./config/hazelcast.yaml】

hazelcast:
  cluster-name: seatunnel
  network:
    rest-api:
      enabled: true
      endpoint-groups:
        CLUSTER_WRITE:
          enabled: true
        DATA:
          enabled: true
    join:
      tcp-ip:
        enabled: true
        member-list:
          - 10.123.123.121
          - 10.123.123.122
          - 10.123.123.123
          - 10.123.123.124

    port:
      auto-increment: false
      port: 5801
  map:
    engine-map:
       map-store:
         enabled: true
         initial-mode: EAGER
         factory-class-name: org.apache.seatunnel.engine.server.persistence.FileMapStoreFactory
         properties:
           type: hdfs
           namespace: /tmp/seatunnel/imap
           clusterName: seatunnel-cluster
           storage.type: oss
           block.size: 10240
           oss.bucket: oss://mogudb/
           fs.oss.accessKeyId: **
           fs.oss.accessKeySecret: **
           fs.oss.endpoint: cn-qingdao-internal.oss-accesspoint.aliyuncs.com
           #fs.oss.credentials.provider: org.apache.hadoop.fs.aliyun.oss.AliyunCredentialsProvider
  properties:
    hazelcast.invocation.max.retry.count: 20
    hazelcast.tcp.join.port.try.count: 30

Running Command

sh /data/seatunnel235/bin/seatunnel.sh --config ./config/v2.gxy.streaming.conf.template

Error Exception

Error in the first case:
org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error] - stream load error: [LABEL_ALREADY_EXISTS]Status : errCode = 2, detailMessage = Label [test-cdc_tis_gxy_gxy_student_856834878388305923_0_8] has already been used, relate to txn [5981062]
2024-06-24 10:43:24,275 INFO  [o.a.s.c.d.s.w.DorisStreamLoad ] [hz.main.seaTunnel.task.thread-31] - abort 5992921 for check label label_92e1051f8765498e_0624_1016_tis_gxy_gxy_student_857450112475987971_0_13.
2024-06-24 10:43:24,284 INFO  [o.a.s.c.d.s.w.DorisStreamLoad ] [hz.main.seaTunnel.task.thread-31] - abort for labelSuffix label_92e1051f8765498e_0624_1016_tis_gxy_gxy_student_857450112475987971_0 finished
2024-06-24 10:43:24,284 WARN  [o.a.s.e.s.TaskExecutionService] [hz.main.seaTunnel.task.thread-31] - [10.29.86.93]:5801 [seatunnel] [5.1] Exception in org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask@4812e351
java.lang.RuntimeException: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error]
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:27) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.handleRecord(IntermediateBlockingQueue.java:70) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.group.queue.IntermediateBlockingQueue.collect(IntermediateBlockingQueue.java:50) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.flow.IntermediateQueueFlowLifeCycle.collect(IntermediateQueueFlowLifeCycle.java:51) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.collect(TransformSeaTunnelTask.java:73) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:168) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.TransformSeaTunnelTask.call(TransformSeaTunnelTask.java:78) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:703) ~[seatunnel-starter.jar:2.3.5]
	at org.apache.seatunnel.engine.server.TaskExecutionService$NamedTaskWrapper.run(TaskExecutionService.java:1004) ~[seatunnel-starter.jar:2.3.5]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_91]
	at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_91]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[?:1.8.0_91]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[?:1.8.0_91]
	at java.lang.Thread.run(Thread.java:745) [?:1.8.0_91]
Caused by: java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error]
	at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:234) ~[seatunnel-transforms-v2.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5]
	... 16 more
Caused by: java.util.concurrent.ExecutionException: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error]
	at java.util.concurrent.FutureTask.report(FutureTask.java:122) ~[?:1.8.0_91]
	at java.util.concurrent.FutureTask.get(FutureTask.java:192) ~[?:1.8.0_91]
	at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:232) ~[seatunnel-transforms-v2.jar:2.3.5]
	at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:191) ~[seatunnel-starter.jar:2.3.5]
	... 16 more
Caused by: org.apache.seatunnel.connectors.doris.exception.DorisConnectorException: ErrorCode:[Doris-01], ErrorDescription:[stream load error]
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.stopLoad(DorisStreamLoad.java:220) ~[?:?]
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.flush(DorisSinkWriter.java:159) ~[?:?]
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.prepareCommit(DorisSinkWriter.java:145) ~[?:?]
	at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5]
	... 5 more
Caused by: java.io.IOException: Stream closed
	at java.util.zip.GZIPInputStream.ensureOpen(GZIPInputStream.java:62) ~[?:1.8.0_91]
	at java.util.zip.GZIPInputStream.read(GZIPInputStream.java:113) ~[?:1.8.0_91]
	at org.apache.http.client.entity.LazyDecompressingInputStream.read(LazyDecompressingInputStream.java:70) ~[seatunnel-hadoop3-3.3.6-uber.jar:2.7-WS-test-SNAPSHOT]
	at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284) ~[?:1.8.0_91]
	at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326) ~[?:1.8.0_91]
	at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178) ~[?:1.8.0_91]
	at java.io.InputStreamReader.read(InputStreamReader.java:184) ~[?:1.8.0_91]
	at java.io.Reader.read(Reader.java:140) ~[?:1.8.0_91]
	at org.apache.http.util.EntityUtils.toString(EntityUtils.java:247) ~[seatunnel-hadoop3-3.3.6-uber.jar:2.7-WS-test-SNAPSHOT]
	at org.apache.http.util.EntityUtils.toString(EntityUtils.java:291) ~[seatunnel-hadoop3-3.3.6-uber.jar:2.7-WS-test-SNAPSHOT]
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.handlePreCommitResponse(DorisStreamLoad.java:205) ~[?:?]
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisStreamLoad.stopLoad(DorisStreamLoad.java:218) ~[?:?]
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.flush(DorisSinkWriter.java:159) ~[?:?]
	at org.apache.seatunnel.connectors.doris.sink.writer.DorisSinkWriter.prepareCommit(DorisSinkWriter.java:145) ~[?:?]
	at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.lambda$prepareCommit$4(MultiTableSinkWriter.java:217) ~[seatunnel-transforms-v2.jar:2.3.5]

Error in the second case:
java.lang.RuntimeException: java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:262)
    at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:68)
    at org.apache.seatunnel.engine.server.task.SeaTunnelTransformCollector.collect(SeaTunnelTransformCollector.java:39)
    ...
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.subSinkErrorCheck(MultiTableSinkWriter.java:121)
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableSinkWriter.prepareCommit(MultiTableSinkWriter.java:201)
    ...
Caused by: java.lang.InterruptedException
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
    at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2088)
    at java.util.concurrent.LinkedBlockingQueue.poll(LinkedBlockingQueue.java:467)
    at org.apache.seatunnel.connectors.seatunnel.common.multitablesink.MultiTableWriterRunnable.run(MultiTableWriterRunnable.java:47)
    ...

Zeta or Flink or Spark Version

No response

Java or Scala Version

1.8.0_181

Screenshots

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

1 participant