command) {
+ return super.executeUndefinedCommand(command);
+ }
+
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSource.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSource.java
new file mode 100644
index 00000000..834bd669
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSource.java
@@ -0,0 +1,75 @@
+package org.jetlinks.community.datasource;
+
+import org.jetlinks.core.command.CommandException;
+import org.jetlinks.core.command.CommandSupport;
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nonnull;
+
+/**
+ * 统一数据源接口定义,{@link DataSource#getType()}标识数据源类型.
+ *
+ * 数据源统一管理,请勿手动调用{@link DataSource#dispose()}
+ *
+ * @author zhouhao
+ * @since 1.10
+ */
+public interface DataSource extends CommandSupport, Disposable {
+
+ /**
+ * @return 数据源ID
+ */
+ String getId();
+
+ /**
+ * @return 数据源类型
+ */
+ DataSourceType getType();
+
+ /**
+ * 执行指令,具体指令有对应的数据源实现定义.
+ *
+ * @param command 指令
+ * @param 结果类型
+ * @return void
+ * @see UnsupportedOperationException
+ */
+ @Nonnull
+ @Override
+ default R execute(@Nonnull org.jetlinks.core.command.Command command) {
+ throw new CommandException.NoStackTrace(this, command, "error.unsupported_command");
+ }
+
+ /**
+ * 获取数据源状态
+ *
+ * @return 状态
+ * @see DataSourceState
+ */
+ default Mono state() {
+ return Mono.just(DataSourceState.ok);
+ }
+
+ /**
+ * 判断数据源是为指定的类型
+ *
+ * @param target 类型
+ * @return 是否为指定的类型
+ */
+ default boolean isWrapperFor(java.lang.Class> target) {
+ return target.isInstance(this);
+ }
+
+ /**
+ * 按指定类型拆箱数据源,返回对应的数据源。如果类型不一致,可能抛出{@link ClassCastException}
+ *
+ * @param target 目标类型
+ * @param T
+ * @return 数据源
+ */
+ default T unwrap(Class target) {
+ return target.cast(this);
+ }
+
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceConfig.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceConfig.java
new file mode 100644
index 00000000..90e9aa25
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceConfig.java
@@ -0,0 +1,22 @@
+package org.jetlinks.community.datasource;
+
+import lombok.Generated;
+import lombok.Getter;
+import lombok.Setter;
+import org.jetlinks.community.ValueObject;
+
+import java.util.Map;
+
+@Getter
+@Setter
+@Generated
+public class DataSourceConfig implements ValueObject {
+ private String id;
+ private String typeId;
+ private Map configuration;
+
+ @Override
+ public Map values() {
+ return configuration;
+ }
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceConfigManager.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceConfigManager.java
new file mode 100644
index 00000000..b5bb6ae2
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceConfigManager.java
@@ -0,0 +1,36 @@
+package org.jetlinks.community.datasource;
+
+import reactor.core.Disposable;
+import reactor.core.publisher.Mono;
+
+import java.util.function.BiFunction;
+
+/**
+ * 数据源配置管理器,统一管理数据源配置
+ *
+ * @author zhouhao
+ * @since 1.10
+ */
+public interface DataSourceConfigManager {
+
+ /**
+ * 根据类型ID和数据源ID获取配置
+ *
+ * @param typeId 类型ID
+ * @param datasourceId 数据源ID
+ * @return 配置信息
+ */
+ Mono getConfig(String typeId, String datasourceId);
+
+ /**
+ * 监听配置变化,当有配置变化后将调用回调参数
+ *
+ * @param callback 回调参数
+ */
+ Disposable doOnConfigChanged(BiFunction> callback);
+
+ enum ConfigState{
+ normal,
+ disabled
+ }
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceConstants.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceConstants.java
new file mode 100644
index 00000000..e24ac857
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceConstants.java
@@ -0,0 +1,46 @@
+package org.jetlinks.community.datasource;
+
+import org.jetlinks.core.command.Command;
+import org.jetlinks.core.command.CommandSupport;
+import org.jetlinks.core.command.CommandUtils;
+import org.jetlinks.core.metadata.FunctionMetadata;
+import org.jetlinks.core.metadata.SimpleFunctionMetadata;
+import org.jetlinks.community.command.CommandSupportManagerProviders;
+import reactor.core.publisher.Mono;
+
+import java.util.function.Consumer;
+
+public interface DataSourceConstants {
+
+
+ interface Commands {
+
+ static String createCommandProvider(String dataSourceId) {
+ return "datasource$" + dataSourceId;
+ }
+
+ static Mono getCommandSupport(String datasourceId) {
+ return CommandSupportManagerProviders
+ .getCommandSupport(createCommandProvider(datasourceId));
+ }
+
+ static Mono getCommandSupport(String datasourceId, String supportId) {
+ return CommandSupportManagerProviders
+ .getCommandSupport(createCommandProvider(datasourceId), supportId);
+ }
+
+ }
+
+ interface Metadata {
+
+ static FunctionMetadata create(@SuppressWarnings("all") Class extends Command> cmdType,
+ Consumer handler) {
+ SimpleFunctionMetadata metadata = new SimpleFunctionMetadata();
+ metadata.setId(CommandUtils.getCommandIdByType(cmdType));
+ metadata.setName(metadata.getId());
+ handler.accept(metadata);
+ return metadata;
+ }
+
+ }
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceManager.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceManager.java
new file mode 100644
index 00000000..f4fe0caa
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceManager.java
@@ -0,0 +1,84 @@
+package org.jetlinks.community.datasource;
+
+import reactor.core.publisher.Flux;
+import org.jetlinks.community.datasource.exception.DataSourceNotExistException;
+import reactor.core.publisher.Mono;
+
+import java.util.List;
+
+/**
+ * 数据源管理器,用于统一管理数据源
+ *
+ * @author zhouhao
+ * @since 1.10
+ */
+public interface DataSourceManager {
+
+ /**
+ * 获取支持的数据源类型
+ *
+ * @return 数据源类型
+ */
+ List getSupportedType();
+
+ /**
+ * 根据类型获取数据眼供应商
+ *
+ * @param typeId 类型ID
+ * @return 数据源供应商
+ */
+ DataSourceProvider getProvider(String typeId);
+
+
+ /**
+ * 根据类型ID获取已存在的数据源
+ *
+ * @param typeId 类型ID
+ * @return 数据源列表
+ */
+ Flux getDataSources(String typeId);
+
+ /**
+ * 获取指定的数据源,如果数据源不存在则返回{@link Mono#empty()}
+ *
+ * @param type 数据源类型
+ * @param datasourceId 数据源ID
+ * @return 数据源
+ */
+ Mono getDataSource(DataSourceType type, String datasourceId);
+
+ /**
+ * 获取指定的数据源,如果数据源不存在则抛出异常{@link DataSourceNotExistException}
+ *
+ * @param type 数据源类型
+ * @param datasourceId 数据源ID
+ * @return 数据源
+ * @see DataSourceNotExistException
+ */
+ default Mono getDataSourceOrError(DataSourceType type, String datasourceId) {
+ return getDataSource(type, datasourceId)
+ .switchIfEmpty(Mono.error(() -> new DataSourceNotExistException(type, datasourceId)));
+ }
+
+ /**
+ * 获取指定的数据源,如果数据源不存在则返回{@link Mono#empty()}
+ *
+ * @param typeId 数据源类型ID
+ * @param datasourceId 数据源ID
+ * @return 数据源
+ */
+ Mono getDataSource(String typeId, String datasourceId);
+
+ /**
+ * 获取指定的数据源,如果数据源不存在则抛出异常{@link DataSourceNotExistException}
+ *
+ * @param typeId 数据源类型ID
+ * @param datasourceId 数据源ID
+ * @return 数据源
+ * @see DataSourceNotExistException
+ */
+ default Mono getDataSourceOrError(String typeId, String datasourceId) {
+ return getDataSource(typeId, datasourceId)
+ .switchIfEmpty(Mono.error(() -> new DataSourceNotExistException(typeId, datasourceId)));
+ }
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceProvider.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceProvider.java
new file mode 100644
index 00000000..adf03ace
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceProvider.java
@@ -0,0 +1,101 @@
+package org.jetlinks.community.datasource;
+
+import org.jetlinks.core.command.Command;
+import org.jetlinks.core.command.CommandHandler;
+import org.jetlinks.core.monitor.Monitor;
+import reactor.core.publisher.Mono;
+
+import javax.annotation.Nonnull;
+import java.util.Map;
+
+/**
+ * 数据源提供商,用于提供对数据源的支持.
+ *
+ * @author zhouhao
+ * @see DataSource
+ * @since 1.10
+ */
+public interface DataSourceProvider {
+
+ /**
+ * @return 数据源类型
+ */
+ @Nonnull
+ DataSourceType getType();
+
+ /**
+ * 根据数据源配置来创建数据源
+ *
+ * @param properties 数据源配置
+ * @return 数据源
+ */
+ @Nonnull
+ Mono createDataSource(@Nonnull DataSourceConfig properties);
+
+ /**
+ * 使用新的配置来重新加载数据源
+ *
+ * @param dataSource 数据源
+ * @param properties 配置
+ * @return 重新加载后的数据源
+ */
+ @Nonnull
+ Mono reload(@Nonnull DataSource dataSource,
+ @Nonnull DataSourceConfig properties);
+
+ /**
+ * 创建命令支持,用于提供针对某个数据源的命令支持.
+ *
+ * 命令执行过程中请使用{@link CommandConfiguration#getMonitor()}进行日志打印以及链路追踪。
+ *
+ * @return 命令支持
+ * @since 2.3
+ */
+ default Mono> createCommandHandler(CommandConfiguration configuration) {
+ return Mono.empty();
+ }
+
+ /**
+ * 命令配置
+ */
+ interface CommandConfiguration {
+
+ /**
+ * 获取命令ID
+ *
+ * @return 命令ID
+ * @see Command#getCommandId()
+ */
+ String getCommandId();
+
+ /**
+ * 获取命令名称
+ *
+ * @return 命令名称
+ */
+ String getCommandName();
+
+ /**
+ * 获取命令配置信息
+ *
+ * @return 配置信息
+ */
+ Map getConfiguration();
+
+ /**
+ * 获取数据源
+ *
+ * @return 数据源
+ * @see DataSource#isWrapperFor(Class)
+ * @see DataSource#unwrap(Class)
+ */
+ Mono getDataSource();
+
+ /**
+ * 获取监控器,用于日志打印,链路追踪等
+ *
+ * @return 监控器
+ */
+ Monitor getMonitor();
+ }
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceState.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceState.java
new file mode 100644
index 00000000..3cfb2f13
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceState.java
@@ -0,0 +1,38 @@
+package org.jetlinks.community.datasource;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.SneakyThrows;
+
+@AllArgsConstructor
+@Getter
+public class DataSourceState {
+ //正常
+ public static String code_ok = "ok";
+ //正常
+ public static String code_stopped = "stopped";
+ //正常
+ public static String code_error = "error";
+
+ public static final DataSourceState ok = new DataSourceState(code_ok, null);
+ public static final DataSourceState stopped = new DataSourceState(code_stopped, null);
+
+ private final String code;
+
+ private final Throwable reason;
+
+ public boolean isOk(){
+ return code_ok.equals(code);
+ }
+
+ public static DataSourceState error(Throwable error) {
+ return new DataSourceState(code_error, error);
+ }
+
+ @SneakyThrows
+ public void validate() {
+ if (reason != null) {
+ throw reason;
+ }
+ }
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceType.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceType.java
new file mode 100644
index 00000000..080391be
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DataSourceType.java
@@ -0,0 +1,7 @@
+package org.jetlinks.community.datasource;
+
+public interface DataSourceType {
+ String getId();
+
+ String getName();
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DefaultDataSourceManager.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DefaultDataSourceManager.java
new file mode 100644
index 00000000..55e264cd
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/DefaultDataSourceManager.java
@@ -0,0 +1,235 @@
+package org.jetlinks.community.datasource;
+
+import lombok.AllArgsConstructor;
+import lombok.EqualsAndHashCode;
+import lombok.Generated;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import org.jetlinks.core.cache.ReactiveCacheContainer;
+import reactor.core.Disposable;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.util.function.Tuple2;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+/**
+ * 默认数据源管理器
+ *
+ * 获取数据源时,如果数据源不存在.
+ * 则尝试从{@link DataSourceConfigManager#getConfig(String, String)}获取配置,
+ * 然后调用{@link DataSourceConfig#getTypeId()}对应的{@link DataSourceProvider#getType()}
+ * 进行初始化{@link DataSourceProvider#createDataSource(DataSourceConfig)}.
+ *
+ *
+ * 通过实现{@link DataSourceProvider}并注入到Spring容器中,即可实现自定义数据源.
+ *
+ *
+ * 当数据源配置发生变化时,将自动重新加载数据源.
+ *
+ * @author zhouhao
+ * @see DataSourceProvider
+ * @see DataSource
+ * @since 1.9
+ */
+@Slf4j
+public class DefaultDataSourceManager implements DataSourceManager {
+
+ private final Map providers = new ConcurrentHashMap<>();
+
+ private final ReactiveCacheContainer cachedDataSources = ReactiveCacheContainer.create();
+
+ private final DataSourceConfigManager dataSourceConfigManager;
+
+ public DefaultDataSourceManager(DataSourceConfigManager configManager) {
+ this.dataSourceConfigManager = configManager;
+ this.dataSourceConfigManager
+ .doOnConfigChanged((state, properties) -> {
+ //禁用,则删除数据源
+ if (state == DataSourceConfigManager.ConfigState.disabled) {
+ this.removeDataSource(properties.getTypeId(), properties.getId());
+ } else {
+
+ if (cachedDataSources.containsKey(new CacheKey(properties.getTypeId(), properties.getId()))) {
+ //重新加载
+ return this
+ .reloadDataSource(properties.getTypeId(), properties.getId())
+ .then();
+ }
+
+ }
+ return Mono.empty();
+ });
+ }
+
+ /**
+ * 注册一个数据源提供商
+ *
+ * @param provider 数据源提供商
+ * @see DataSourceProvider
+ */
+ public void register(DataSourceProvider provider) {
+ log.debug("Register DataSource {} Provider {}", provider.getType().getId(), provider);
+ providers.put(provider.getType().getId(), provider);
+ }
+
+ /**
+ * 注册一个已经初始化的数据源
+ *
+ * @param dataSource 数据源
+ * @see DataSource
+ */
+ public void register(DataSource dataSource) {
+ log.debug("Register DataSource {} {}", dataSource.getType().getId(), dataSource);
+ CacheKey key = new CacheKey(dataSource.getType().getId(), dataSource.getId());
+ cachedDataSources.put(key, dataSource);
+ }
+
+ @Override
+ public DataSourceProvider getProvider(String typeId) {
+ DataSourceProvider dataSourceProvider = providers.get(typeId);
+ if (dataSourceProvider == null) {
+ throw new UnsupportedOperationException("不支持的数据源类型:" + typeId);
+ }
+ return dataSourceProvider;
+ }
+
+ @Override
+ public Flux getDataSources(String typeId) {
+ return cachedDataSources
+ .values()
+ .filter(dataSource -> Objects.equals(typeId, dataSource.getType().getId()));
+ }
+
+ @Override
+ public List getSupportedType() {
+ return providers
+ .values()
+ .stream()
+ .map(DataSourceProvider::getType)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Mono getDataSource(DataSourceType type,
+ String datasourceId) {
+ return getDataSource(type.getId(), datasourceId);
+ }
+
+ @Override
+ public Mono getDataSource(String typeId, String datasourceId) {
+ return getOrCreateRef(typeId, datasourceId);
+ }
+
+ /**
+ * 获取数据源引用缓存,如果没有则自动加载,如果数据源不存在,不会立即报错.
+ * 在使用{@link DataSourceRef#getRef()}才会返回错误.
+ *
+ * @param typeId 数据源类型ID
+ * @param datasourceId 数据源ID
+ * @return 数据源引用
+ */
+ private Mono getOrCreateRef(String typeId, String datasourceId) {
+ return cachedDataSources
+ .computeIfAbsent(new CacheKey(typeId, datasourceId),
+ key -> loadDataSource(key.type, key.datasourceId));
+
+ }
+
+ public Mono> loadConfigAndProvider(String typeId, String datasourceId) {
+ return Mono
+ .zip(
+ dataSourceConfigManager.getConfig(typeId, datasourceId),
+ Mono
+ .justOrEmpty(providers.get(typeId))
+ .switchIfEmpty(Mono.error(() -> new UnsupportedOperationException("unsupported datasource type " + typeId)))
+ );
+
+ }
+
+ public Mono loadDataSource(String typeId, String datasourceId) {
+ return this
+ .loadConfigAndProvider(typeId, datasourceId)
+ .flatMap(tp2 -> tp2.getT2().createDataSource(tp2.getT1()))
+ .doOnNext(dataSource -> log
+ .debug("load {} datasource [{}]", dataSource.getType().getId(), dataSource.getId()));
+
+ }
+
+ private void removeDataSource(String typeId, String datasourceId) {
+ cachedDataSources.remove(new CacheKey(typeId, datasourceId));
+ }
+
+ private Mono validateDataSource(DataSourceProvider provider, DataSourceConfig config) {
+
+ return provider
+ .createDataSource(config)
+ .flatMap(dataSource -> dataSource
+ .state()
+ .doOnNext(DataSourceState::validate)
+ //销毁测试数据源
+ .doAfterTerminate(dataSource::dispose)
+ .then()
+ );
+ }
+
+ private Mono reloadDataSource(String typeId, String datasourceId) {
+
+
+ return this
+ .loadConfigAndProvider(typeId, datasourceId)
+ //先校验一下
+ .flatMap(tp2 -> this
+ .validateDataSource(tp2.getT2(), tp2.getT1())
+ .thenReturn(tp2))
+ .flatMap(tp2 -> cachedDataSources
+ .compute(
+ new CacheKey(typeId, datasourceId),
+ (key, old) -> {
+ if (old != null) {
+ return tp2.getT2().reload(old, tp2.getT1());
+ }
+ return tp2.getT2().createDataSource(tp2.getT1());
+
+ }
+ ))
+ .doOnError(err -> log.error("reload {} datasource [{}] error ", typeId, datasourceId, err));
+
+ }
+
+ @AllArgsConstructor
+ @EqualsAndHashCode
+ static class CacheKey {
+ private final String type;
+ private final String datasourceId;
+ }
+
+ static class DataSourceRef implements Disposable {
+
+ @Getter
+ private volatile Mono ref;
+
+ private boolean disposed = false;
+
+ public DataSourceRef(Mono ref) {
+ this.ref = ref;
+ }
+
+ @Override
+ public void dispose() {
+ ref = Mono.empty();
+ disposed = true;
+ }
+
+ @Override
+ @Generated
+ public boolean isDisposed() {
+ return disposed;
+ }
+ }
+
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/command/CommandHandlerProvider.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/command/CommandHandlerProvider.java
new file mode 100644
index 00000000..a7573033
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/command/CommandHandlerProvider.java
@@ -0,0 +1,16 @@
+package org.jetlinks.community.datasource.command;
+
+import org.jetlinks.core.command.CommandHandler;
+import org.jetlinks.community.datasource.DataSourceProvider;
+import org.jetlinks.community.spi.Provider;
+import reactor.core.publisher.Mono;
+
+public interface CommandHandlerProvider {
+
+ Provider supports = Provider.create(CommandHandlerProvider.class);
+
+ String getType();
+
+ Mono> createCommandHandler(DataSourceProvider.CommandConfiguration configuration);
+
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/command/DataSourceCommandConfig.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/command/DataSourceCommandConfig.java
new file mode 100644
index 00000000..d5cb3d63
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/command/DataSourceCommandConfig.java
@@ -0,0 +1,48 @@
+package org.jetlinks.community.datasource.command;
+
+import lombok.Getter;
+import lombok.Setter;
+import org.jetlinks.core.command.Command;
+import org.jetlinks.community.datasource.DataSource;
+
+import java.util.Map;
+
+@Getter
+@Setter
+public class DataSourceCommandConfig {
+
+ /**
+ * 数据源类型
+ *
+ * @see DataSource#getType()
+ */
+ private String datasourceType;
+ /**
+ * 数据源ID
+ *
+ * @see DataSource#getId()
+ */
+ private String datasourceId;
+
+ /**
+ * 命令支持ID,比如一个数据源命令分类.
+ *
+ * @see org.jetlinks.community.command.CommandSupportManagerProvider#getCommandSupport(String, Map)
+ */
+ private String supportId;
+
+ /**
+ * 命令ID
+ *
+ * @see Command#getCommandId()
+ */
+ private String commandId;
+
+ private String commandName;
+
+ /**
+ * 命令配置信息
+ */
+ private Map configuration;
+
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/command/DataSourceCommandSupportManager.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/command/DataSourceCommandSupportManager.java
new file mode 100644
index 00000000..482218b8
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/command/DataSourceCommandSupportManager.java
@@ -0,0 +1,238 @@
+package org.jetlinks.community.datasource.command;
+
+import lombok.AllArgsConstructor;
+import org.jetlinks.core.Lazy;
+import org.jetlinks.core.command.*;
+import org.jetlinks.core.event.EventBus;
+import org.jetlinks.core.lang.SeparatedCharSequence;
+import org.jetlinks.core.lang.SharedPathString;
+import org.jetlinks.core.monitor.Monitor;
+import org.jetlinks.core.monitor.logger.Logger;
+import org.jetlinks.core.monitor.logger.Slf4jLogger;
+import org.jetlinks.core.monitor.metrics.Metrics;
+import org.jetlinks.core.monitor.tracer.SimpleTracer;
+import org.jetlinks.core.monitor.tracer.Tracer;
+import org.jetlinks.community.command.CommandSupportManagerProvider;
+import org.jetlinks.community.datasource.DataSource;
+import org.jetlinks.community.datasource.DataSourceConstants;
+import org.jetlinks.community.datasource.DataSourceManager;
+import org.jetlinks.community.datasource.DataSourceProvider;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+import org.springframework.util.StringUtils;
+import reactor.core.Disposable;
+import reactor.core.Disposables;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+
+@AllArgsConstructor
+public abstract class DataSourceCommandSupportManager {
+
+ public static final String defaultSupportId = "@@default";
+
+ protected final DataSourceManager dataSourceManager;
+
+ private final Map supports = new ConcurrentHashMap<>();
+
+ protected Mono registerCommand(DataSourceCommandConfig config) {
+ return supports
+ .computeIfAbsent(
+ config.getDatasourceId(),
+ datasourceId -> new DataSourceCommandSupportProvider(this, datasourceId))
+ .register(config);
+ }
+
+ protected Mono unregisterCommand(DataSourceCommandConfig config) {
+ return Mono.fromRunnable(() -> {
+ supports.compute(
+ config.getDatasourceId(),
+ (datasourceId,
+ provider) -> {
+ if (provider != null) {
+ provider.unregister(config);
+ if (provider.isEmpty()) {
+ provider.dispose();
+ return null;
+ }
+ }
+ return null;
+ });
+ });
+ }
+
+ protected abstract Mono getCommandSupportInfo(String datasourceId, String supportId);
+
+ static class DataSourceCommandSupportProvider
+ implements CommandSupportManagerProvider {
+
+ private final DataSourceCommandSupportManager parent;
+ private final String datasourceId;
+ private final Disposable.Composite disposable = Disposables.composite();
+ private final Map commandSupports = new ConcurrentHashMap<>();
+
+ DataSourceCommandSupportProvider(DataSourceCommandSupportManager parent,
+ String datasourceId) {
+ this.datasourceId = datasourceId;
+ this.parent = parent;
+ this.disposable.add(
+ CommandSupportManagerProvider
+ .supports
+ .register(this.getProvider(), this)
+ );
+ }
+
+ void dispose() {
+ this.disposable.dispose();
+ }
+
+ boolean isEmpty() {
+ return commandSupports.isEmpty();
+ }
+
+ //注册命令
+ public Mono register(DataSourceCommandConfig config) {
+ return parent
+ .dataSourceManager
+ .getProvider(config.getDatasourceType())
+ .createCommandHandler(new CommandConfigurationImpl(config, parent.dataSourceManager))
+ .doOnNext(handler -> registerHandler(config, handler))
+ .then();
+ }
+
+
+ private void registerHandler(DataSourceCommandConfig config, CommandHandler, ?> handler) {
+ String supportId = StringUtils.hasText(config.getSupportId()) ? config.getSupportId() : defaultSupportId;
+
+ commandSupports
+ .computeIfAbsent(supportId,
+ id -> new DataSourceCommandSupport())
+ .registerHandler(config.getCommandId(),
+ handler);
+ }
+
+ //注销命令
+ public void unregister(DataSourceCommandConfig config) {
+ String supportId = StringUtils.hasText(config.getSupportId()) ? config.getSupportId() : defaultSupportId;
+ commandSupports.compute(supportId, (id, support) -> {
+ if (support != null) {
+ support.unregister(config.getCommandId());
+ if (support.isEmpty()) {
+ return null;
+ }
+ }
+ return null;
+ });
+ }
+
+ @Override
+ public String getProvider() {
+ return DataSourceConstants.Commands.createCommandProvider(datasourceId);
+ }
+
+ @Override
+ public Mono extends CommandSupport> getCommandSupport(String id, Map options) {
+ if (id == null || Objects.equals(getProvider(), id)) {
+ id = defaultSupportId;
+ }
+
+ return Mono.justOrEmpty(commandSupports.get(id));
+ }
+
+ @Override
+ public Flux getSupportInfo() {
+ return Flux
+ .fromIterable(commandSupports.entrySet())
+ .flatMap(e -> {
+ String id = e.getKey();
+ if (Objects.equals(id, defaultSupportId)) {
+ id = null;
+ }
+ return parent.getCommandSupportInfo(datasourceId, id);
+ }, 8);
+ }
+
+ static class DataSourceCommandSupport extends AbstractCommandSupport {
+ @Override
+ protected , R> void registerHandler(String id, CommandHandler handler) {
+ super.registerHandler(id, handler);
+ }
+
+ void unregister(String commandId) {
+ handlers.remove(commandId);
+ }
+
+ boolean isEmpty() {
+ return handlers.isEmpty();
+ }
+ }
+ }
+
+
+ static class CommandConfigurationImpl extends Slf4jLogger
+ implements DataSourceProvider.CommandConfiguration, Monitor {
+ private final DataSourceCommandConfig config;
+ private final DataSourceManager manager;
+ private final Tracer tracer;
+
+ CommandConfigurationImpl(DataSourceCommandConfig config,
+ DataSourceManager manager) {
+ super(LoggerFactory.getLogger("org.jetlinks.community.datasource." + config.getDatasourceType()));
+ this.config = config;
+ this.manager = manager;
+ // TODO 企业版支持 链路追踪。
+ this.tracer = Tracer.noop();
+ }
+
+ @Override
+ public String getCommandId() {
+ return config.getCommandId();
+ }
+
+ @Override
+ public String getCommandName() {
+ return config.getCommandName();
+ }
+
+ @Override
+ public Map getConfiguration() {
+ return config.getConfiguration();
+ }
+
+ @Override
+ public Mono getDataSource() {
+ return manager.getDataSource(config.getDatasourceType(), config.getDatasourceId());
+ }
+
+ @Override
+ public Monitor getMonitor() {
+ return this;
+ }
+
+ @Override
+ public Logger logger() {
+ return this;
+ }
+
+ @Override
+ public Tracer tracer() {
+ return tracer;
+ }
+
+ @Override
+ public Metrics metrics() {
+ return Metrics.noop();
+ }
+
+ @Override
+ public void log(Level level, String message, Object... args) {
+ // TODO 企业版支持 页面中实时查看日志。
+ super.log(level, message, args);
+ }
+ }
+
+
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/configuration/DataSourceHandlerProviderConfiguration.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/configuration/DataSourceHandlerProviderConfiguration.java
new file mode 100644
index 00000000..3844b9bb
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/configuration/DataSourceHandlerProviderConfiguration.java
@@ -0,0 +1,14 @@
+package org.jetlinks.community.datasource.configuration;
+
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.context.annotation.Bean;
+
+@AutoConfiguration
+public class DataSourceHandlerProviderConfiguration {
+
+ @Bean
+ public DataSourceHandlerProviderRegister dataSourceHandlerProviderRegister() {
+ return new DataSourceHandlerProviderRegister();
+ }
+
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/configuration/DataSourceHandlerProviderRegister.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/configuration/DataSourceHandlerProviderRegister.java
new file mode 100644
index 00000000..f3350bf5
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/configuration/DataSourceHandlerProviderRegister.java
@@ -0,0 +1,31 @@
+package org.jetlinks.community.datasource.configuration;
+
+import org.jetlinks.community.datasource.command.CommandHandlerProvider;
+import org.springframework.beans.BeansException;
+import org.springframework.beans.factory.SmartInitializingSingleton;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.ApplicationContextAware;
+
+import javax.annotation.Nonnull;
+import java.util.List;
+
+public class DataSourceHandlerProviderRegister implements ApplicationContextAware, SmartInitializingSingleton {
+
+ private ApplicationContext context;
+
+ @Override
+ public void setApplicationContext(@Nonnull ApplicationContext applicationContext) throws BeansException {
+ this.context = applicationContext;
+ }
+
+ @Override
+ public void afterSingletonsInstantiated() {
+
+ List commandHandlerProviders = context
+ .getBeanProvider(CommandHandlerProvider.class)
+ .stream()
+ .toList();
+
+ commandHandlerProviders.forEach(provider -> CommandHandlerProvider.supports.register(provider.getType(), provider));
+ }
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/configuration/DataSourceManagerConfiguration.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/configuration/DataSourceManagerConfiguration.java
new file mode 100644
index 00000000..d1cc094c
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/configuration/DataSourceManagerConfiguration.java
@@ -0,0 +1,24 @@
+package org.jetlinks.community.datasource.configuration;
+
+import org.jetlinks.community.datasource.*;
+import org.jetlinks.community.datasource.*;
+import org.springframework.beans.factory.ObjectProvider;
+import org.springframework.boot.autoconfigure.AutoConfiguration;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.context.annotation.Bean;
+
+@AutoConfiguration
+public class DataSourceManagerConfiguration {
+
+ @Bean
+ @ConditionalOnBean(DataSourceConfigManager.class)
+ public DataSourceManager dataSourceManager(DataSourceConfigManager dataSourceConfigManager,
+ ObjectProvider providers,
+ ObjectProvider dataSources){
+ DefaultDataSourceManager dataSourceManager= new DefaultDataSourceManager(dataSourceConfigManager);
+ providers.forEach(dataSourceManager::register);
+ dataSources.forEach(dataSourceManager::register);
+ return dataSourceManager;
+ }
+
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/exception/DataSourceNotExistException.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/exception/DataSourceNotExistException.java
new file mode 100644
index 00000000..3f5dc2fd
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/exception/DataSourceNotExistException.java
@@ -0,0 +1,15 @@
+package org.jetlinks.community.datasource.exception;
+
+import org.hswebframework.web.exception.I18nSupportException;
+import org.jetlinks.community.datasource.DataSourceType;
+
+public class DataSourceNotExistException extends I18nSupportException {
+
+ public DataSourceNotExistException(DataSourceType datasourceType, String dataSourceId) {
+ this(datasourceType.getId(), dataSourceId);
+ }
+
+ public DataSourceNotExistException(String datasourceType, String dataSourceId) {
+ super("error.datasource_not_exist", datasourceType, dataSourceId);
+ }
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/DefaultRDBDataSource.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/DefaultRDBDataSource.java
new file mode 100644
index 00000000..933910ea
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/DefaultRDBDataSource.java
@@ -0,0 +1,345 @@
+package org.jetlinks.community.datasource.rdb;
+
+import com.zaxxer.hikari.HikariDataSource;
+import io.r2dbc.pool.ConnectionPool;
+import io.r2dbc.pool.ConnectionPoolConfiguration;
+import io.r2dbc.spi.Connection;
+import lombok.SneakyThrows;
+import org.apache.commons.collections4.MapUtils;
+import org.hswebframework.ezorm.rdb.executor.SqlRequest;
+import org.hswebframework.ezorm.rdb.executor.reactive.ReactiveSyncSqlExecutor;
+import org.hswebframework.ezorm.rdb.executor.wrapper.ResultWrapper;
+import org.hswebframework.ezorm.rdb.metadata.RDBDatabaseMetadata;
+import org.hswebframework.ezorm.rdb.metadata.RDBSchemaMetadata;
+import org.hswebframework.ezorm.rdb.operator.DatabaseOperator;
+import org.hswebframework.ezorm.rdb.operator.DefaultDatabaseOperator;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.hswebframework.web.crud.configuration.DialectProvider;
+import org.hswebframework.web.crud.query.DefaultQueryHelper;
+import org.hswebframework.web.crud.query.QueryHelper;
+import org.hswebframework.web.crud.sql.DefaultR2dbcExecutor;
+import org.hswebframework.web.exception.I18nSupportException;
+import org.jetlinks.community.datasource.rdb.command.*;
+import org.jetlinks.core.command.Command;
+import org.jetlinks.core.command.CommandHandler;
+import org.jetlinks.core.lang.SeparatedCharSequence;
+import org.jetlinks.core.lang.SharedPathString;
+import org.jetlinks.community.datasource.AbstractDataSource;
+import org.jetlinks.community.datasource.DataSourceState;
+import org.jetlinks.community.datasource.DataSourceType;
+import org.jetlinks.community.datasource.rdb.command.*;
+import org.jetlinks.community.utils.ObjectMappers;
+import org.reactivestreams.Publisher;
+import org.springframework.boot.autoconfigure.r2dbc.R2dbcProperties;
+import org.springframework.boot.context.properties.PropertyMapper;
+import org.springframework.boot.r2dbc.ConnectionFactoryBuilder;
+import org.springframework.dao.DataAccessResourceFailureException;
+import org.springframework.r2dbc.connection.ConnectionFactoryUtils;
+import org.springframework.r2dbc.connection.R2dbcTransactionManager;
+import org.springframework.transaction.reactive.TransactionalOperator;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Sinks;
+
+import javax.annotation.Nonnull;
+import java.io.Closeable;
+import java.sql.SQLException;
+import java.time.Duration;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class DefaultRDBDataSource extends AbstractDataSource implements RDBDataSource {
+
+ private DatabaseOperator operator;
+
+ private String validateSql;
+
+ private final QueryHelper queryHelper;
+
+ private final List closeables = new CopyOnWriteArrayList<>();
+
+ private final Sinks.One loading = Sinks.one();
+
+ private TransactionalOperator transactionalOperator;
+
+ public DefaultRDBDataSource(String id,
+ RDBDataSourceProperties config) {
+ super(id, config);
+ init();
+ this.queryHelper = new DefaultQueryHelper(operator);
+
+ //刷新RDB数据源命令:Refresh
+ registerHandler(
+ Refresh.class,
+ CommandHandler.of(
+ Refresh.metadata(),
+ (cmd, ignore) -> cmd.execute(operator),
+ Refresh::new
+ )
+ );
+
+ //执行SQL命令:ExecuteSql
+ registerHandler(
+ ExecuteSql.class,
+ CommandHandler.of(
+ ExecuteSql.metadata(),
+ (cmd, ignore) -> loading
+ .asMono()
+ .thenMany(cmd.execute(operator)),
+ ExecuteSql::new
+ )
+ );
+
+ //执行列表查询命令:QueryList
+ registerHandler(
+ QueryList.class,
+ CommandHandler.of(
+ QueryList.metadata(),
+ (cmd, ignore) -> loading
+ .asMono()
+ .thenMany(cmd.execute(operator)),
+ QueryList::new
+ )
+ );
+
+ //执行分页查询命令:QueryPager
+ registerHandler(
+ QueryPager.class,
+ CommandHandler.of(
+ QueryPager.metadata(),
+ (cmd, ignore) -> loading
+ .asMono()
+ .then(cmd.execute(operator)),
+ QueryPager::new
+ )
+ );
+
+ //执行统计数量命令:Count
+ registerHandler(
+ Count.class,
+ CommandHandler.of(
+ Count.metadata(),
+ (cmd, ignore) -> loading
+ .asMono()
+ .then(cmd.execute(operator)),
+ Count::new
+ )
+ );
+
+ }
+
+ private void loadTables() {
+ new Refresh().execute(operator)
+ .doOnTerminate(loading::tryEmitEmpty)
+ .subscribe();
+ }
+
+ @Override
+ public DatabaseOperator operator() {
+ return operator;
+ }
+
+ @Override
+ public QueryHelper helper() {
+ return queryHelper;
+ }
+
+ @Override
+ public DataSourceType getType() {
+ return RDBDataSourceType.rdb;
+ }
+
+ @SneakyThrows
+ public void init() {
+ try {
+ if (getConfig().getType() == RDBDataSourceProperties.Type.r2dbc) {
+ initR2dbc();
+ } else {
+ initJdbc();
+ }
+ loadTables();
+ validateSql = getConfig().getValidateQuery();
+ } catch (Throwable e) {
+ throw translateException(e);
+ }
+ }
+
+ @SneakyThrows
+ synchronized void initR2dbc() {
+ DialectProvider dialect = getConfig().dialectProvider();
+ R2dbcProperties properties;
+ if (MapUtils.isNotEmpty(getConfig().getOthers())) {
+ //使用jsonCopy,FastBeanCopier不支持final字段copy.
+ properties = ObjectMappers
+ .parseJson(ObjectMappers.toJsonBytes(getConfig().getOthers()), R2dbcProperties.class);
+ } else {
+ properties = new R2dbcProperties();
+ }
+ properties.setUrl(getConfig().getUrl());
+ properties.setUsername(getConfig().getUsername());
+ properties.setPassword(getConfig().getPassword());
+
+ PropertyMapper mapper = PropertyMapper.get().alwaysApplyingWhenNonNull();
+
+ ConnectionFactoryBuilder connectionFactoryBuilder = ConnectionFactoryBuilder.withUrl(properties.getUrl());
+
+ mapper.from(properties.getUsername()).whenNonNull().to(connectionFactoryBuilder::username);
+ mapper.from(properties.getPassword()).whenNonNull().to(connectionFactoryBuilder::password);
+
+ R2dbcProperties.Pool pool = properties.getPool();
+ ConnectionPoolConfiguration.Builder builder = ConnectionPoolConfiguration.builder(connectionFactoryBuilder.build());
+ builder.maxLifeTime(Duration.ofMinutes(10));
+ builder.maxAcquireTime(Duration.ofSeconds(10));
+ mapper.from(pool.getMaxIdleTime()).to(builder::maxIdleTime);
+ mapper.from(pool.getInitialSize()).to(builder::initialSize);
+ mapper.from(pool.getMaxSize()).to(builder::maxSize);
+ mapper.from(pool.getValidationQuery()).whenHasText().to(builder::validationQuery);
+ mapper.from(pool.getMaxLifeTime()).whenNonNull().to(builder::maxLifeTime);
+ mapper.from(pool.getMaxAcquireTime()).whenNonNull().to(builder::maxAcquireTime);
+ mapper.from(pool.getMaxLifeTime()).whenNonNull().to(builder::maxLifeTime);
+ mapper.from(pool.getMaxAcquireTime()).whenNonNull().to(builder::maxAcquireTime);
+ mapper.from(pool.getValidationQuery()).whenHasText().to(builder::validationQuery);
+
+ ConnectionPool connectionPool = new ConnectionPool(builder.build());
+ closeables.add(() -> connectionPool.close().subscribe());
+ transactionalOperator = TransactionalOperator.create(new R2dbcTransactionManager(connectionPool));
+
+ DefaultR2dbcExecutor executor = new DefaultR2dbcExecutor() {
+
+ @Override
+ protected Mono getConnection() {
+ return ConnectionFactoryUtils.getConnection(connectionPool);
+ }
+
+ @Override
+ public Mono execute(Publisher request) {
+ return super
+ .execute(request)
+ .as(transactionalOperator::transactional);
+ }
+
+ @Override
+ public Mono update(Publisher request) {
+ return super
+ .update(request)
+ .as(transactionalOperator::transactional);
+ }
+
+ @Override
+ public Flux select(Publisher request, ResultWrapper wrapper) {
+ return super
+ .select(request, wrapper)
+ .as(transactionalOperator::transactional);
+ }
+ };
+ executor.setBindSymbol(dialect.getBindSymbol());
+ executor.setBindCustomSymbol(!executor.getBindSymbol().equals("?"));
+ RDBDatabaseMetadata database = new RDBDatabaseMetadata(dialect.getDialect());
+ database.addFeature(executor);
+ database.addFeature(ReactiveSyncSqlExecutor.of(executor));
+
+ RDBSchemaMetadata schema = dialect.createSchema(getConfig().getSchema());
+ database.addSchema(schema);
+ database.setCurrentSchema(schema);
+ this.operator = DefaultDatabaseOperator.of(database);
+
+ }
+
+ private Throwable translateException(Throwable err) {
+ if (err instanceof IllegalStateException) {
+ if (err.getMessage() != null && err.getMessage().contains("Available drivers")) {
+ return new I18nSupportException("error.unsupported_database_type", err);
+ }
+ }
+ if (err instanceof DataAccessResourceFailureException) {
+ return new I18nSupportException("error.database_access_error", err);
+ }
+ if (err instanceof SQLException) {
+ String msg = err.getMessage();
+ if (msg.contains("No suitable driver")) {
+ return new I18nSupportException("error.unsupported_database_type", err);
+ }
+ return err;
+ }
+ if (err.getClass() == RuntimeException.class) {
+ if (err.getCause() != null && err.getCause() != err) {
+ return translateException(err.getCause());
+ }
+ }
+ return err;
+ }
+
+ synchronized void initJdbc() {
+ HikariDataSource dataSource = new HikariDataSource();
+ dataSource.setJdbcUrl(getConfig().getUrl());
+ dataSource.setUsername(getConfig().getUsername());
+ dataSource.setPassword(getConfig().getPassword());
+ if (MapUtils.isNotEmpty(getConfig().getOthers())) {
+ FastBeanCopier.copy(getConfig().getOthers(), dataSource);
+ }
+ closeables.add(dataSource);
+ RDBDatabaseMetadata database = new RDBDatabaseMetadata(getConfig().dialectProvider().getDialect());
+ database.addFeature(new RDBJdbcReactiveSqlExecutor(dataSource));
+ database.addFeature(new RDBJdbcSyncSqlExecutor(dataSource));
+
+ RDBSchemaMetadata schema = getConfig().dialectProvider().createSchema(getConfig().getSchema());
+ database.addSchema(schema);
+ database.setCurrentSchema(schema);
+
+ this.operator = DefaultDatabaseOperator.of(database);
+ this.transactionalOperator = null;
+ }
+
+ @Override
+ protected Mono checkState() {
+ return operator
+ .sql()
+ .reactive()
+ .select(validateSql)
+ .map(i -> DataSourceState.ok)
+ .onErrorResume(err -> Mono.just(DataSourceState.error(translateException(err))))
+ .last();
+ }
+
+ @Override
+ @SuppressWarnings("all")
+ protected R executeUndefinedCommand(@Nonnull Command command) {
+ if (command instanceof RDBCommand) {
+ R r = ((RDBCommand) command).execute(operator);
+ if (transactionalOperator != null) {
+ if (r instanceof Mono) {
+ return (R) transactionalOperator.transactional(((Mono) r));
+ } else if (r instanceof Flux) {
+ return (R) transactionalOperator.transactional(((Flux) r));
+ }
+ }
+ return r;
+ }
+ return super.executeUndefinedCommand(command);
+ }
+
+ @Override
+ protected void handleSetConfig(RDBDataSourceProperties oldConfig,
+ RDBDataSourceProperties newConfig) {
+ if (oldConfig == null) {
+ return;
+ }
+ releaseOld();
+ init();
+ }
+
+ protected void releaseOld() {
+ closeables.removeIf(closeable -> {
+ try {
+ closeable.close();
+ } catch (Throwable ignore) {
+ }
+ return true;
+ });
+ }
+
+ @Override
+ protected void doOnDispose() {
+ releaseOld();
+ }
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/RDBDataSource.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/RDBDataSource.java
new file mode 100644
index 00000000..b4cfe4b3
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/RDBDataSource.java
@@ -0,0 +1,33 @@
+package org.jetlinks.community.datasource.rdb;
+
+import org.hswebframework.ezorm.rdb.operator.DatabaseOperator;
+import org.hswebframework.web.crud.query.QueryHelper;
+import org.jetlinks.community.datasource.rdb.command.*;
+import org.jetlinks.community.datasource.DataSource;
+import org.jetlinks.community.datasource.DataSourceType;
+import org.jetlinks.core.command.Command;
+
+import javax.annotation.Nonnull;
+
+
+public interface RDBDataSource extends DataSource {
+
+ RDBDataSourceProperties getConfig();
+
+ RDBDataSourceProperties copyConfig();
+
+ DatabaseOperator operator();
+
+ QueryHelper helper();
+
+ @Nonnull
+ @Override
+ default R execute(@Nonnull Command command) {
+ return DataSource.super.execute(command);
+ }
+
+ @Override
+ default DataSourceType getType() {
+ return RDBDataSourceType.rdb;
+ }
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/RDBDataSourceProperties.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/RDBDataSourceProperties.java
new file mode 100644
index 00000000..4c092446
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/RDBDataSourceProperties.java
@@ -0,0 +1,93 @@
+package org.jetlinks.community.datasource.rdb;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.AccessLevel;
+import lombok.Getter;
+import lombok.Setter;
+import org.hswebframework.web.crud.configuration.DialectProvider;
+import org.hswebframework.web.crud.configuration.DialectProviders;
+import org.hswebframework.web.crud.configuration.EasyormProperties;
+import org.hswebframework.web.exception.ValidationException;
+import org.hswebframework.web.validator.ValidatorUtils;
+
+import jakarta.validation.constraints.NotBlank;
+import java.net.URI;
+import java.util.Map;
+
+@Getter
+@Setter
+public class RDBDataSourceProperties {
+
+ private Type type;
+
+ @Schema(description = "url")
+ @NotBlank
+ private String url;
+
+ @NotBlank
+ @Schema(description = "数据库")
+ private String schema;
+
+ @Schema(description = "用户名")
+ private String username;
+
+ @Schema(description = "密码")
+ private String password;
+
+ private Map others;
+
+ @Schema(description = "数据库方言")
+ private String dialect;
+
+ @Getter(AccessLevel.PRIVATE)
+ @Setter(AccessLevel.PRIVATE)
+ private transient DialectProvider dialectProvider;
+
+ public String getValidateQuery() {
+ return dialectProvider().getValidationSql();
+ }
+
+ public DialectProvider dialectProvider() {
+ URI uri = URI.create(getUrl());
+
+ if (null == dialectProvider) {
+ if (null != dialect) {
+ dialectProvider = DialectProviders.lookup(dialect);
+ } else if (getUrl().contains("mysql") || getUrl().contains("mariadb")) {
+ return EasyormProperties.DialectEnum.mysql;
+ } else if (getUrl().contains("postgresql")) {
+ return EasyormProperties.DialectEnum.postgres;
+ } else if (getUrl().contains("oracle")) {
+ return EasyormProperties.DialectEnum.oracle;
+ } else if (getUrl().contains("mssql") || getUrl().contains("sqlserver")) {
+ return EasyormProperties.DialectEnum.mssql;
+ } else if (getUrl().contains("h2")) {
+ return EasyormProperties.DialectEnum.h2;
+ } else if (getUrl().contains("dm")) {
+ return DialectProviders.lookup("dm");
+ } else {
+ throw new ValidationException("url", "error.unsupported_database_type", uri.getFragment());
+ }
+ }
+ return dialectProvider;
+ }
+
+ public RDBDataSourceProperties validate() {
+ ValidatorUtils.tryValidate(this);
+ return this;
+ }
+
+ public Type getType() {
+ if (type == null) {
+ type = url.startsWith("r2dbc") ? Type.r2dbc : Type.jdbc;
+ }
+ return type;
+ }
+
+
+ public enum Type {
+ jdbc,
+ r2dbc;
+ }
+
+}
diff --git a/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/RDBDataSourceProvider.java b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/RDBDataSourceProvider.java
new file mode 100644
index 00000000..12690fee
--- /dev/null
+++ b/jetlinks-components/datasource-component/src/main/java/org/jetlinks/community/datasource/rdb/RDBDataSourceProvider.java
@@ -0,0 +1,217 @@
+package org.jetlinks.community.datasource.rdb;
+
+import io.swagger.v3.oas.annotations.media.Schema;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.hswebframework.ezorm.rdb.metadata.RDBColumnMetadata;
+import org.hswebframework.ezorm.rdb.operator.DatabaseOperator;
+import org.hswebframework.web.api.crud.entity.QueryParamEntity;
+import org.hswebframework.web.bean.FastBeanCopier;
+import org.hswebframework.web.crud.query.DefaultQueryHelper;
+import org.hswebframework.web.crud.query.QueryAnalyzer;
+import org.hswebframework.web.crud.query.QueryHelper;
+import org.jetlinks.community.datasource.DataSource;
+import org.jetlinks.community.datasource.DataSourceConfig;
+import org.jetlinks.community.datasource.DataSourceProvider;
+import org.jetlinks.community.datasource.DataSourceType;
+import org.jetlinks.core.command.CommandHandler;
+import org.jetlinks.core.metadata.DataType;
+import org.jetlinks.core.metadata.PropertyMetadata;
+import org.jetlinks.core.metadata.SimplePropertyMetadata;
+import org.jetlinks.core.metadata.types.*;
+import org.jetlinks.core.monitor.Monitor;
+import org.jetlinks.community.datasource.*;
+import org.jetlinks.community.datasource.rdb.command.QueryList;
+import org.jetlinks.community.datasource.rdb.command.QueryPager;
+import org.jetlinks.community.datasource.rdb.command.RDBRequestListCommand;
+import org.jetlinks.community.datasource.rdb.command.RDBRequestPagerCommand;
+import org.jetlinks.community.things.utils.ThingsDatabaseUtils;
+import org.jetlinks.sdk.server.commons.cmd.QueryPagerCommand;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
+import javax.annotation.Nonnull;
+import java.util.*;
+import java.util.function.Function;
+
+@Component
+public class RDBDataSourceProvider implements DataSourceProvider {
+
+ @Nonnull
+ @Override
+ public DataSourceType getType() {
+ return RDBDataSourceType.rdb;
+ }
+
+ @Nonnull
+ @Override
+ public Mono createDataSource(@Nonnull DataSourceConfig properties) {
+
+ return Mono
+ .fromCallable(() -> RDBDataSourceProvider
+ .create(properties.getId(),
+ FastBeanCopier.copy(properties.getConfiguration(), new RDBDataSourceProperties())
+ .validate()
+ )
+ )
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+
+ @Nonnull
+ @Override
+ public Mono reload(@Nonnull DataSource dataSource, @Nonnull DataSourceConfig properties) {
+ return Mono
+ .defer(() -> {
+ RDBDataSourceProperties dataSourceProperties = FastBeanCopier
+ .copy(properties.getConfiguration(), RDBDataSourceProperties::new)
+ .validate();
+ if (dataSource.isWrapperFor(DefaultRDBDataSource.class)) {
+ dataSource
+ .unwrap(DefaultRDBDataSource.class)
+ .setConfig(dataSourceProperties);
+ return Mono.just(dataSource);
+ }
+ dataSource.dispose();
+ return createDataSource(properties);
+ })
+ .subscribeOn(Schedulers.boundedElastic());
+ }
+
+ public static RDBDataSource create(String id, RDBDataSourceProperties properties) {
+ return new DefaultRDBDataSource(id, properties);
+ }
+
+
+ @Override
+ public Mono> createCommandHandler(CommandConfiguration configuration) {
+ Configuration config = FastBeanCopier.copy(configuration.getConfiguration(), Configuration.class);
+ RDBDefinition rdbDefinition = config.getRdbDefinition();
+ Boolean paging = rdbDefinition.getPaging();
+
+ return getRdbDataSource(configuration)
+ .flatMap(source -> {
+ List resultColumns = getResultColumns(
+ rdbDefinition.getSql(),
+ source.helper(),
+ configuration.getMonitor());
+ if (paging) {
+ return Mono.just(
+ RDBRequestPagerCommand
+ .createQueryHandler(
+ configuration.getCommandId(),
+ configuration.getCommandName(),
+ metadata -> metadata.setOutput(QueryPagerCommand.createOutputType(resultColumns)),
+ cmd -> getRdbDataSource(configuration)
+ .flatMap(database -> database.execute(new QueryPager().with(cmd.with("sql", rdbDefinition.getSql()).asMap())))
+ )
+ );
+ }
+ return Mono.just(
+ RDBRequestListCommand
+ .createQueryHandler(
+ configuration.getCommandId(),
+ configuration.getCommandName(),
+ metadata -> metadata.setOutput(getResultType(resultColumns)),
+ cmd -> getRdbDataSource(configuration)
+ .flatMapMany(database -> database.execute(new QueryList().with(cmd.with("sql", rdbDefinition.getSql()).asMap())))
+ ));
+ });
+
+ }
+
+ private static Mono getRdbDataSource(CommandConfiguration configuration) {
+ return configuration
+ .getDataSource()
+ .map(datasource -> datasource.unwrap(RDBDataSource.class));
+ }
+
+ private Flux