diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java index 96b04c19b950..7c82f5733955 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceImpl.java @@ -35,15 +35,18 @@ import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation; import org.apache.dolphinscheduler.dao.entity.TaskDefinition; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper; import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.collections4.SetUtils; import org.apache.commons.lang3.StringUtils; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Date; import java.util.List; @@ -82,6 +85,9 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme @Autowired private TaskDefinitionDao taskDefinitionDao; + @Autowired + private WorkerGroupDao workerGroupDao; + /** * create environment * @@ -111,6 +117,9 @@ public Long createEnvironment(User loginUser, throw new ServiceException(Status.ENVIRONMENT_NAME_EXISTS, name); } + List workerGroupList = parseWorkerGroupList(workerGroups); + Map workerGroupMap = queryWorkerGroupMap(workerGroupList); + Environment env = new Environment(); env.setName(name); env.setConfig(config); @@ -121,25 +130,20 @@ public Long createEnvironment(User loginUser, env.setCode(CodeGenerateUtils.genCode()); if (environmentMapper.insert(env) > 0) { - if (!StringUtils.isEmpty(workerGroups)) { - List workerGroupList = JSONUtils.parseObject(workerGroups, new TypeReference>() { + if (CollectionUtils.isNotEmpty(workerGroupList)) { + workerGroupList.forEach(workerGroup -> { + EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation(); + relation.setEnvironmentCode(env.getCode()); + relation.setWorkerGroupId(workerGroupMap.get(workerGroup).getId()); + relation.setWorkerGroup(workerGroup); + relation.setOperator(loginUser.getId()); + relation.setCreateTime(new Date()); + relation.setUpdateTime(new Date()); + relationMapper.insert(relation); + log.info( + "Environment-WorkerGroup relation create complete, environmentName:{}, workerGroup:{}.", + env.getName(), relation.getWorkerGroup()); }); - if (CollectionUtils.isNotEmpty(workerGroupList)) { - workerGroupList.stream().forEach(workerGroup -> { - if (!StringUtils.isEmpty(workerGroup)) { - EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation(); - relation.setEnvironmentCode(env.getCode()); - relation.setWorkerGroup(workerGroup); - relation.setOperator(loginUser.getId()); - relation.setCreateTime(new Date()); - relation.setUpdateTime(new Date()); - relationMapper.insert(relation); - log.info( - "Environment-WorkerGroup relation create complete, environmentName:{}, workerGroup:{}.", - env.getName(), relation.getWorkerGroup()); - } - }); - } } return env.getCode(); } @@ -329,13 +333,8 @@ public Environment updateEnvironmentByCode(User loginUser, Long code, String nam throw new ServiceException(Status.ENVIRONMENT_NAME_EXISTS, name); } - Set workerGroupSet; - if (!StringUtils.isEmpty(workerGroups)) { - workerGroupSet = JSONUtils.parseObject(workerGroups, new TypeReference>() { - }); - } else { - workerGroupSet = new TreeSet<>(); - } + Set workerGroupSet = new TreeSet<>(parseWorkerGroupList(workerGroups)); + Map workerGroupMap = queryWorkerGroupMap(workerGroupSet); Set existWorkerGroupSet = relationMapper .queryByEnvironmentCode(code) @@ -374,6 +373,7 @@ public Environment updateEnvironmentByCode(User loginUser, Long code, String nam if (StringUtils.isNotEmpty(key)) { EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation(); relation.setEnvironmentCode(code); + relation.setWorkerGroupId(workerGroupMap.get(key).getId()); relation.setWorkerGroup(key); relation.setUpdateTime(new Date()); relation.setCreateTime(new Date()); @@ -418,6 +418,43 @@ private void checkUsedEnvironmentWorkerGroupRelation(Set deleteKeySet, } } + private List parseWorkerGroupList(String workerGroups) { + if (StringUtils.isEmpty(workerGroups)) { + return Collections.emptyList(); + } + List workerGroupList = JSONUtils.parseObject(workerGroups, new TypeReference>() { + }); + if (CollectionUtils.isEmpty(workerGroupList)) { + return Collections.emptyList(); + } + return workerGroupList.stream() + .filter(StringUtils::isNotEmpty) + .collect(Collectors.toList()); + } + + private Map queryWorkerGroupMap(Collection workerGroupNames) { + if (CollectionUtils.isEmpty(workerGroupNames)) { + return Collections.emptyMap(); + } + Set nonEmptyWorkerGroupNames = workerGroupNames.stream() + .filter(StringUtils::isNotEmpty) + .collect(Collectors.toCollection(TreeSet::new)); + if (CollectionUtils.isEmpty(nonEmptyWorkerGroupNames)) { + return Collections.emptyMap(); + } + + List workerGroups = workerGroupDao.queryWorkerGroupByNames(nonEmptyWorkerGroupNames); + Map workerGroupMap = CollectionUtils.emptyIfNull(workerGroups).stream() + .collect(Collectors.toMap(WorkerGroup::getName, workerGroup -> workerGroup)); + Set notExistWorkerGroups = + SetUtils.difference(nonEmptyWorkerGroupNames, workerGroupMap.keySet()).toSet(); + if (CollectionUtils.isNotEmpty(notExistWorkerGroups)) { + throw new ServiceException(Status.WORKER_GROUP_NOT_EXIST, + String.join(",", new TreeSet<>(notExistWorkerGroups))); + } + return workerGroupMap; + } + protected void checkParams(String name, String config, String workerGroups) { if (StringUtils.isEmpty(name)) { throw new ServiceException(Status.ENVIRONMENT_NAME_IS_NULL); diff --git a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java index ea7cc151cccd..708872bffd40 100644 --- a/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java +++ b/dolphinscheduler-api/src/main/java/org/apache/dolphinscheduler/api/service/impl/WorkerGroupServiceImpl.java @@ -182,7 +182,10 @@ private void checkWorkerGroupDependencies(WorkerGroup workerGroup) { // check if the worker group has any dependent environments List environmentWorkerGroupRelations = environmentWorkerGroupRelationMapper.selectList(new QueryWrapper() - .lambda().eq(EnvironmentWorkerGroupRelation::getWorkerGroup, workerGroup.getName())); + .lambda() + .eq(EnvironmentWorkerGroupRelation::getWorkerGroupId, workerGroup.getId()) + .or() + .eq(EnvironmentWorkerGroupRelation::getWorkerGroup, workerGroup.getName())); if (CollectionUtils.isNotEmpty(environmentWorkerGroupRelations)) { throw new ServiceException(Status.WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS, diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java index e2ba1b7e6a86..f12412b38b8e 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/WorkerGroupServiceTest.java @@ -32,6 +32,7 @@ import org.apache.dolphinscheduler.common.enums.AuthorizationType; import org.apache.dolphinscheduler.common.enums.UserType; import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus; +import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation; import org.apache.dolphinscheduler.dao.entity.User; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.entity.WorkflowInstance; @@ -43,7 +44,10 @@ import org.apache.dolphinscheduler.registry.api.RegistryClient; import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType; +import org.apache.ibatis.builder.MapperBuilderAssistant; + import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -51,6 +55,7 @@ import java.util.Set; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; @@ -63,6 +68,10 @@ import org.slf4j.LoggerFactory; import org.springframework.dao.DuplicateKeyException; +import com.baomidou.mybatisplus.core.MybatisConfiguration; +import com.baomidou.mybatisplus.core.conditions.Wrapper; +import com.baomidou.mybatisplus.core.metadata.TableInfoHelper; + @ExtendWith(MockitoExtension.class) @MockitoSettings(strictness = Strictness.LENIENT) public class WorkerGroupServiceTest { @@ -97,6 +106,14 @@ public class WorkerGroupServiceTest { private final String GROUP_NAME = "testWorkerGroup"; + @BeforeEach + public void setUp() { + if (TableInfoHelper.getTableInfo(EnvironmentWorkerGroupRelation.class) == null) { + TableInfoHelper.initTableInfo(new MapperBuilderAssistant(new MybatisConfiguration(), ""), + EnvironmentWorkerGroupRelation.class); + } + } + private User getLoginUser() { User loginUser = new User(); loginUser.setUserType(UserType.GENERAL_USER); @@ -260,6 +277,66 @@ public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() { assertDoesNotThrow(() -> workerGroupService.deleteWorkerGroupById(loginUser, 1)); } + @Test + public void giveEnvironmentRelationMatchedByWorkerGroupId_whenDeleteWorkerGroupById_expectEnvironmentDependency() { + User loginUser = getLoginUser(); + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, + WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true); + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + baseServiceLogger)).thenReturn(true); + WorkerGroup workerGroup = getWorkerGroup(1); + when(workerGroupDao.queryById(1)).thenReturn(workerGroup); + when(workflowInstanceDao.queryByWorkerGroupNameAndStatus(workerGroup.getName(), + WorkflowExecutionStatus.NOT_TERMINAL_STATES)).thenReturn(null); + when(taskDefinitionDao.queryByWorkerGroup(Mockito.any())).thenReturn(null); + when(scheduleDao.queryScheduleByWorkerGroup(Mockito.any())).thenReturn(null); + + EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation(); + relation.setWorkerGroupId(workerGroup.getId()); + relation.setWorkerGroup("stale-" + workerGroup.getName()); + when(environmentWorkerGroupRelationMapper.selectList(Mockito.any())).thenAnswer(invocation -> { + Wrapper wrapper = invocation.getArgument(0); + String sqlSegment = wrapper.getSqlSegment(); + if (sqlSegment.contains("worker_group_id") && sqlSegment.contains("worker_group")) { + return Collections.singletonList(relation); + } + return Collections.emptyList(); + }); + + assertThrowsServiceException(Status.WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS, + () -> workerGroupService.deleteWorkerGroupById(loginUser, 1)); + } + + @Test + public void giveEnvironmentRelationMatchedOnlyByWorkerGroupNameWithDifferentWorkerGroupId_whenDeleteWorkerGroupById_expectEnvironmentDependency() { + User loginUser = getLoginUser(); + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1, + WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true); + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1, + baseServiceLogger)).thenReturn(true); + WorkerGroup workerGroup = getWorkerGroup(1); + when(workerGroupDao.queryById(1)).thenReturn(workerGroup); + when(workflowInstanceDao.queryByWorkerGroupNameAndStatus(workerGroup.getName(), + WorkflowExecutionStatus.NOT_TERMINAL_STATES)).thenReturn(null); + when(taskDefinitionDao.queryByWorkerGroup(Mockito.any())).thenReturn(null); + when(scheduleDao.queryScheduleByWorkerGroup(Mockito.any())).thenReturn(null); + + EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation(); + relation.setWorkerGroupId(2); + relation.setWorkerGroup(workerGroup.getName()); + when(environmentWorkerGroupRelationMapper.selectList(Mockito.any())).thenAnswer(invocation -> { + Wrapper wrapper = invocation.getArgument(0); + String sqlSegment = wrapper.getSqlSegment(); + Assertions.assertTrue(sqlSegment.contains("worker_group_id")); + Assertions.assertTrue(sqlSegment.contains("worker_group =") || sqlSegment.contains("worker_group=")); + Assertions.assertFalse(sqlSegment.contains("IS NULL")); + return Collections.singletonList(relation); + }); + + assertThrowsServiceException(Status.WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS, + () -> workerGroupService.deleteWorkerGroupById(loginUser, 1)); + } + @Test public void testQueryAllGroupWithDefault() { List workerGroups = workerGroupService.queryAllGroup(getLoginUser()); diff --git a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceTest.java b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceTest.java index e7cd2a7b433e..5a97f4ba0244 100644 --- a/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceTest.java +++ b/dolphinscheduler-api/src/test/java/org/apache/dolphinscheduler/api/service/impl/EnvironmentServiceTest.java @@ -39,9 +39,11 @@ import org.apache.dolphinscheduler.dao.entity.Environment; import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation; import org.apache.dolphinscheduler.dao.entity.User; +import org.apache.dolphinscheduler.dao.entity.WorkerGroup; import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper; import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper; import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao; +import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao; import org.apache.commons.collections4.CollectionUtils; @@ -55,6 +57,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.MockedStatic; @@ -88,6 +91,9 @@ public class EnvironmentServiceTest { @Mock private TaskDefinitionDao taskDefinitionDao; + @Mock + private WorkerGroupDao workerGroupDao; + @Mock private ResourcePermissionCheckService resourcePermissionCheckService; @@ -120,6 +126,8 @@ public void testCreateEnvironment() { .createEnvironment(adminUser, environmentName, getConfig(), getDesc(), workerGroups)); when(environmentMapper.insert(any(Environment.class))).thenReturn(1); + when(workerGroupDao.queryWorkerGroupByNames(Collections.singleton("default"))) + .thenReturn(Collections.singletonList(getWorkerGroup(1, "default"))); when(relationMapper.insert(any(EnvironmentWorkerGroupRelation.class))).thenReturn(1); assertThrowsServiceException(Status.DESCRIPTION_TOO_LONG_ERROR, @@ -140,6 +148,72 @@ public void testCreateEnvironment() { } } + @Test + public void testCreateEnvironmentWithNotExistWorkerGroup() { + User adminUser = getAdminUser(); + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.ENVIRONMENT, + adminUser.getId(), ENVIRONMENT_CREATE, baseServiceLogger)).thenReturn(true); + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.ENVIRONMENT, null, + 0, baseServiceLogger)).thenReturn(true); + when(workerGroupDao.queryWorkerGroupByNames(Collections.singleton("default"))) + .thenReturn(Collections.emptyList()); + + assertThrowsServiceException(Status.WORKER_GROUP_NOT_EXIST, + () -> environmentService.createEnvironment(adminUser, "testName", "test", "test", workerGroups)); + } + + @Test + public void testCreateEnvironmentWithWorkerGroupId() { + User adminUser = getAdminUser(); + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.ENVIRONMENT, + adminUser.getId(), ENVIRONMENT_CREATE, baseServiceLogger)).thenReturn(true); + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.ENVIRONMENT, null, + 0, baseServiceLogger)).thenReturn(true); + WorkerGroup workerGroup = getWorkerGroup(1, "default"); + when(environmentMapper.insert(any(Environment.class))).thenReturn(1); + when(workerGroupDao.queryWorkerGroupByNames(Collections.singleton("default"))) + .thenReturn(Collections.singletonList(workerGroup)); + when(relationMapper.insert(any(EnvironmentWorkerGroupRelation.class))).thenReturn(1); + + environmentService.createEnvironment(adminUser, "testName", "test", "test", workerGroups); + + ArgumentCaptor relationCaptor = + ArgumentCaptor.forClass(EnvironmentWorkerGroupRelation.class); + Mockito.verify(relationMapper).insert(relationCaptor.capture()); + Mockito.verify(workerGroupDao).queryWorkerGroupByNames(Collections.singleton("default")); + assertEquals(workerGroup.getId(), relationCaptor.getValue().getWorkerGroupId()); + } + + @Test + public void testCreateEnvironmentWithMultipleWorkerGroupsUsesBatchQuery() { + User adminUser = getAdminUser(); + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.ENVIRONMENT, + adminUser.getId(), ENVIRONMENT_CREATE, baseServiceLogger)).thenReturn(true); + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.ENVIRONMENT, null, + 0, baseServiceLogger)).thenReturn(true); + String multipleWorkerGroups = "[\"default\",\"server2\"]"; + Set workerGroupNames = new HashSet<>(); + workerGroupNames.add("default"); + workerGroupNames.add("server2"); + when(environmentMapper.insert(any(Environment.class))).thenReturn(1); + when(workerGroupDao.queryWorkerGroupByNames(workerGroupNames)) + .thenReturn(Lists.newArrayList(getWorkerGroup(1, "default"), getWorkerGroup(2, "server2"))); + when(relationMapper.insert(any(EnvironmentWorkerGroupRelation.class))).thenReturn(1); + + environmentService.createEnvironment(adminUser, "testName", "test", "test", multipleWorkerGroups); + + Mockito.verify(workerGroupDao, Mockito.times(1)).queryWorkerGroupByNames(workerGroupNames); + ArgumentCaptor relationCaptor = + ArgumentCaptor.forClass(EnvironmentWorkerGroupRelation.class); + Mockito.verify(relationMapper, Mockito.times(2)).insert(relationCaptor.capture()); + Assertions.assertTrue(relationCaptor.getAllValues().stream() + .anyMatch(relation -> "default".equals(relation.getWorkerGroup()) + && Integer.valueOf(1).equals(relation.getWorkerGroupId()))); + Assertions.assertTrue(relationCaptor.getAllValues().stream() + .anyMatch(relation -> "server2".equals(relation.getWorkerGroup()) + && Integer.valueOf(2).equals(relation.getWorkerGroupId()))); + } + @Test public void testCheckParams() { assertThrowsServiceException(Status.ENVIRONMENT_WORKER_GROUPS_IS_INVALID, @@ -171,6 +245,8 @@ public void testUpdateEnvironmentByCode() { .updateEnvironmentByCode(adminUser, 2L, environmentName, getConfig(), getDesc(), workerGroups)); when(environmentMapper.update(any(Environment.class), any(Wrapper.class))).thenReturn(-1); + when(workerGroupDao.queryWorkerGroupByNames(Collections.singleton("default"))) + .thenReturn(Collections.singletonList(getWorkerGroup(1, "default"))); assertThrowsServiceException(Status.UPDATE_ENVIRONMENT_ERROR, () -> environmentService.updateEnvironmentByCode(adminUser, 1L, "testName", "test", "test", workerGroups)); @@ -193,6 +269,44 @@ public void testUpdateEnvironmentByCode() { "")); } + @Test + public void testUpdateEnvironmentByCodeWithNotExistWorkerGroup() { + User adminUser = getAdminUser(); + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.ENVIRONMENT, + adminUser.getId(), ENVIRONMENT_UPDATE, baseServiceLogger)).thenReturn(true); + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.ENVIRONMENT, null, + 0, baseServiceLogger)).thenReturn(true); + when(workerGroupDao.queryWorkerGroupByNames(Collections.singleton("default"))) + .thenReturn(Collections.emptyList()); + + assertThrowsServiceException(Status.WORKER_GROUP_NOT_EXIST, + () -> environmentService.updateEnvironmentByCode(adminUser, 1L, "testName", "test", "test", + workerGroups)); + } + + @Test + public void testUpdateEnvironmentByCodeWithWorkerGroupId() { + User adminUser = getAdminUser(); + when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.ENVIRONMENT, + adminUser.getId(), ENVIRONMENT_UPDATE, baseServiceLogger)).thenReturn(true); + when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.ENVIRONMENT, null, + 0, baseServiceLogger)).thenReturn(true); + WorkerGroup workerGroup = getWorkerGroup(1, "default"); + when(environmentMapper.update(any(Environment.class), any(Wrapper.class))).thenReturn(1); + when(relationMapper.queryByEnvironmentCode(1L)).thenReturn(Collections.emptyList()); + when(workerGroupDao.queryWorkerGroupByNames(Collections.singleton("default"))) + .thenReturn(Collections.singletonList(workerGroup)); + when(relationMapper.insert(any(EnvironmentWorkerGroupRelation.class))).thenReturn(1); + + environmentService.updateEnvironmentByCode(adminUser, 1L, "testName", "test", "test", workerGroups); + + ArgumentCaptor relationCaptor = + ArgumentCaptor.forClass(EnvironmentWorkerGroupRelation.class); + Mockito.verify(relationMapper).insert(relationCaptor.capture()); + Mockito.verify(workerGroupDao).queryWorkerGroupByNames(Collections.singleton("default")); + assertEquals(workerGroup.getId(), relationCaptor.getValue().getWorkerGroupId()); + } + @Test public void testQueryAllEnvironmentList() { when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.ENVIRONMENT, @@ -325,6 +439,13 @@ private EnvironmentWorkerGroupRelation getEnvironmentWorkerGroup() { return relation; } + private WorkerGroup getWorkerGroup(int id, String name) { + WorkerGroup workerGroup = new WorkerGroup(); + workerGroup.setId(id); + workerGroup.setName(name); + return workerGroup; + } + /** * create an environment description */ diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/EnvironmentWorkerGroupRelation.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/EnvironmentWorkerGroupRelation.java index 243edbc67ea3..0ed20c3ed609 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/EnvironmentWorkerGroupRelation.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/EnvironmentWorkerGroupRelation.java @@ -37,6 +37,8 @@ public class EnvironmentWorkerGroupRelation { /** * worker group id */ + private Integer workerGroupId; + private String workerGroup; /** diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java index 06a39223efe1..3e3912a5c8ee 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapper.java @@ -22,6 +22,7 @@ import org.apache.ibatis.annotations.Param; +import java.util.Collection; import java.util.List; import com.baomidou.mybatisplus.core.mapper.BaseMapper; @@ -49,6 +50,14 @@ public interface WorkerGroupMapper extends BaseMapper { */ List queryWorkerGroupByName(@Param("name") String name); + /** + * query worker groups by names + * + * @param names worker group names + * @return worker group list + */ + List queryWorkerGroupByNames(@Param("names") Collection names); + int updateAddrListByWorkerGroupName(@Param("name") String name, @Param("addrList") String addrList, @Param("source") WorkerGroupSource source); diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkerGroupDao.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkerGroupDao.java index 7db0c7d10f6c..b5f7fb321dbc 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkerGroupDao.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/WorkerGroupDao.java @@ -19,6 +19,7 @@ import org.apache.dolphinscheduler.dao.entity.WorkerGroup; +import java.util.Collection; import java.util.List; public interface WorkerGroupDao extends IDao { @@ -30,4 +31,6 @@ public interface WorkerGroupDao extends IDao { List queryAllWorkerGroup(); List queryWorkerGroupByName(String name); + + List queryWorkerGroupByNames(Collection names); } diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkerGroupDaoImpl.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkerGroupDaoImpl.java index b5203bb661c9..4b0a6e528930 100644 --- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkerGroupDaoImpl.java +++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/repository/impl/WorkerGroupDaoImpl.java @@ -22,6 +22,10 @@ import org.apache.dolphinscheduler.dao.repository.BaseDao; import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao; +import org.apache.commons.collections4.CollectionUtils; + +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.stream.Collectors; @@ -58,4 +62,12 @@ public List queryAllWorkerGroup() { public List queryWorkerGroupByName(String name) { return mybatisMapper.queryWorkerGroupByName(name); } + + @Override + public List queryWorkerGroupByNames(Collection names) { + if (CollectionUtils.isEmpty(names)) { + return Collections.emptyList(); + } + return mybatisMapper.queryWorkerGroupByNames(names); + } } diff --git a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapper.xml b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapper.xml index 7ea959d6014c..19b2780cc9d4 100644 --- a/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapper.xml +++ b/dolphinscheduler-dao/src/main/resources/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapper.xml @@ -19,7 +19,7 @@ - id, environment_code, worker_group, operator, create_time, update_time + id, environment_code, worker_group_id, worker_group, operator, create_time, update_time + update t_ds_worker_group diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql index 996fce0732b9..847d23a36abb 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql @@ -1158,6 +1158,7 @@ CREATE TABLE t_ds_environment_worker_group_relation ( id int NOT NULL AUTO_INCREMENT, environment_code bigint(20) NOT NULL, + worker_group_id int NOT NULL, worker_group varchar(255) NOT NULL, operator int DEFAULT NULL, create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql index 7067cd7429aa..7a81fdd360a8 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql @@ -1149,7 +1149,8 @@ DROP TABLE IF EXISTS `t_ds_environment_worker_group_relation`; CREATE TABLE `t_ds_environment_worker_group_relation` ( `id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id', `environment_code` bigint(20) NOT NULL COMMENT 'environment code', - `worker_group` varchar(255) NOT NULL COMMENT 'worker group id', + `worker_group_id` int(11) NOT NULL COMMENT 'worker group id', + `worker_group` varchar(255) NOT NULL COMMENT 'worker group name', `operator` int(11) DEFAULT NULL COMMENT 'operator user id', `create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP, `update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, diff --git a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql index 36261d7a8c2d..81246ace7a37 100644 --- a/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql +++ b/dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql @@ -1160,6 +1160,7 @@ DROP TABLE IF EXISTS t_ds_environment_worker_group_relation; CREATE TABLE t_ds_environment_worker_group_relation ( id serial NOT NULL, environment_code bigint NOT NULL, + worker_group_id int NOT NULL, worker_group varchar(255) NOT NULL, operator int DEFAULT NULL, create_time timestamp DEFAULT NULL, diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_ddl.sql new file mode 100644 index 000000000000..75c3f0f913a8 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_ddl.sql @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +ALTER TABLE `t_ds_environment_worker_group_relation` +ADD COLUMN `worker_group_id` int(11) DEFAULT NULL COMMENT 'worker group id'; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_dml.sql new file mode 100644 index 000000000000..5daf53fcebe5 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/mysql/dolphinscheduler_dml.sql @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +UPDATE `t_ds_environment_worker_group_relation` ewgr +INNER JOIN `t_ds_worker_group` wg ON ewgr.`worker_group` = wg.`name` +SET ewgr.`worker_group_id` = wg.`id` +WHERE ewgr.`worker_group_id` IS NULL; + +ALTER TABLE `t_ds_environment_worker_group_relation` +MODIFY COLUMN `worker_group_id` int(11) NOT NULL COMMENT 'worker group id'; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_ddl.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_ddl.sql new file mode 100644 index 000000000000..65fecfdd5ddc --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_ddl.sql @@ -0,0 +1,19 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +ALTER TABLE t_ds_environment_worker_group_relation +ADD COLUMN IF NOT EXISTS worker_group_id int DEFAULT NULL; diff --git a/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_dml.sql b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_dml.sql new file mode 100644 index 000000000000..919d4331a0c1 --- /dev/null +++ b/dolphinscheduler-dao/src/main/resources/sql/upgrade/3.5.0_schema/postgresql/dolphinscheduler_dml.sql @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +UPDATE t_ds_environment_worker_group_relation ewgr +SET worker_group_id = wg.id +FROM t_ds_worker_group wg +WHERE ewgr.worker_group_id IS NULL + AND ewgr.worker_group = wg.name; + +ALTER TABLE t_ds_environment_worker_group_relation +ALTER COLUMN worker_group_id SET NOT NULL; diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapperTest.java index 687a6f229cec..9e1d18279268 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/EnvironmentWorkerGroupRelationMapperTest.java @@ -59,6 +59,7 @@ private EnvironmentWorkerGroupRelation insertOne() { // insertOne EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation(); relation.setEnvironmentCode(1L); + relation.setWorkerGroupId(1); relation.setWorkerGroup("default"); relation.setOperator(1); relation.setUpdateTime(new Date()); @@ -76,6 +77,7 @@ public void testQuery() { // query List relations = environmentWorkerGroupRelationMapper.selectList(null); Assertions.assertEquals(relations.size(), 1); + Assertions.assertEquals(1, relations.get(0).getWorkerGroupId()); } @Test diff --git a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java index 84f140ff0be7..9c9587b91671 100644 --- a/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java +++ b/dolphinscheduler-dao/src/test/java/org/apache/dolphinscheduler/dao/mapper/WorkerGroupMapperTest.java @@ -20,6 +20,7 @@ import org.apache.dolphinscheduler.dao.BaseDaoTest; import org.apache.dolphinscheduler.dao.entity.WorkerGroup; +import java.util.Collections; import java.util.Date; import java.util.List; @@ -93,6 +94,18 @@ public void testQueryWorkerGroupByName() { Assertions.assertEquals(0, workerGroups.size()); } + @Test + public void testQueryWorkerGroupByNames() { + WorkerGroup workerGroup = insertOneWorkerGroup(); + List workerGroups = + workerGroupMapper.queryWorkerGroupByNames(Collections.singleton(workerGroup.getName())); + Assertions.assertEquals(1, workerGroups.size()); + Assertions.assertEquals(workerGroup.getName(), workerGroups.get(0).getName()); + + workerGroups = workerGroupMapper.queryWorkerGroupByNames(Collections.singleton("server2")); + Assertions.assertEquals(0, workerGroups.size()); + } + /** * test update workerGroup */