Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve] refactor clickhouse connector, using Factory to create source/sink #6975

Open
wants to merge 19 commits into
base: dev
Choose a base branch
from

Conversation

liunaijie
Copy link
Contributor

Purpose of this pull request

refactor clickhouse connector, using Factory to create source/sink

Does this PR introduce any user-facing change?

no

How was this patch tested?

using existing test

Check list

@liunaijie liunaijie marked this pull request as ready for review June 13, 2024 05:10
new Shard(1, 1, nodes.get(0)),
readonlyConfig.get(USERNAME),
readonlyConfig.get(PASSWORD));
proxy.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add try.. catch and finally to close proxy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

NodePassConfig::getNodeAddress,
NodePassConfig::getPassword));

proxy.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above.

factoryIdentifier(), PluginType.SOURCE, e.getMessage()));
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Override
public <T, SplitT extends SourceSplit, StateT extends Serializable>
        TableSource<T, SplitT, StateT> createSource(TableSourceFactoryContext context)
                throws PrepareFailException {
    ReadonlyConfig config = context.getOptions();

    List<ClickHouseNode> servers =
            ClickhouseUtil.createNodes(
                    config.get(HOST),
                    config.get(DATABASE),
                    config.get(SERVER_TIME_ZONE),
                    config.get(USERNAME),
                    config.get(PASSWORD));

    String sql = config.get(SQL);
    ClickHouseNode currentServer =
            servers.get(ThreadLocalRandom.current().nextInt(servers.size()));

    CatalogTable catalogTable;
    try (ClickHouseClient client = ClickHouseClient.newInstance(currentServer.getProtocol());
            ClickHouseResponse response =
                    client.connect(currentServer)
                            .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
                            .query(modifySQLToLimit1(sql))
                            .executeAndWait()) {

        TableSchema.Builder builder = TableSchema.builder();
        List<ClickHouseColumn> columns = response.getColumns();
        columns.stream()
                .forEach(
                        column -> {
                            PhysicalColumn physicalColumn =
                                    PhysicalColumn.of(
                                            column.getColumnName(),
                                            TypeConvertUtil.convert(column),
                                            Long.valueOf(column.getEstimatedLength()),
                                            column.getScale(),
                                            column.isNullable(),
                                            null,
                                            null);
                            builder.column(physicalColumn);
                        });
        String catalogName = "clickhouse_catalog";
        catalogTable =
                CatalogTable.of(
                        TableIdentifier.of(catalogName, config.get(DATABASE), "default", "default"),
                        builder.build(),
                        new HashMap<>(),
                        new ArrayList<>(),
                        "",
                        catalogName);
    } catch (ClickHouseException e) {
        throw new ClickhouseConnectorException(
                SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                String.format(
                        "PluginName: %s, PluginType: %s, Message: %s",
                        factoryIdentifier(), PluginType.SOURCE, e.getMessage()));
    }

A reference

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, will update the code.
and one more thing i want say. now clickhouse source doesn't has table_name parameter, only has query. So we can't know the table name we queryed (or need parse the sql).
Maybe we can add table_name is next version, then when we generate the CatalogTable, we can pass the real name instead of default

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can emulate the jdbc source approach

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, and i check the jdbc code, if only pass query will generate the default.default table won't parse sql.

@Carl-Zhou-CN
Copy link
Member

Close when this pr merges #6416

@Carl-Zhou-CN
Copy link
Member

good job

@@ -17,177 +17,33 @@

package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little worried whether clickhousefile can still work properly because we lack test cases for this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Hisoka-X Do we need to add it in this PR

@liunaijie
Copy link
Contributor Author

@Carl-Zhou-CN PTAL when you have time

Comment on lines +70 to +72
@Override
public TableSink<SeaTunnelRow, ClickhouseSinkState, CKFileCommitInfo, CKFileAggCommitInfo>
createSink(TableSinkFactoryContext context) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should add a test case to cover this part. To make sure the sink can be created normally. We can do it in Clickhouse E2E (there has clickhouse container). CC @liunaijie @Carl-Zhou-CN

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

support

node_free_password = true
username = "default"
password = ""
clickhouse_local_path = "/bin/clickhouse local"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tip: When users use the clickhousefile connector, they need to download a clickhouse binary file themselves. So if we want add e2e for clickhousefile. We should download clickhouse-local program in container too. Please check https://clickhouse.com/docs/en/operations/utilities/clickhouse-local#download-clickhouse-local

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants