Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.TaskDefinition;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.SetUtils;
Expand Down Expand Up @@ -82,6 +84,9 @@ public class EnvironmentServiceImpl extends BaseServiceImpl implements Environme
@Autowired
private TaskDefinitionDao taskDefinitionDao;

@Autowired
private WorkerGroupDao workerGroupDao;

/**
* create environment
*
Expand Down Expand Up @@ -129,6 +134,7 @@ public Long createEnvironment(User loginUser,
if (!StringUtils.isEmpty(workerGroup)) {
EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation();
relation.setEnvironmentCode(env.getCode());
relation.setWorkerGroupId(getWorkerGroupId(workerGroup));
relation.setWorkerGroup(workerGroup);
relation.setOperator(loginUser.getId());
relation.setCreateTime(new Date());
Expand Down Expand Up @@ -374,6 +380,7 @@ public Environment updateEnvironmentByCode(User loginUser, Long code, String nam
if (StringUtils.isNotEmpty(key)) {
EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation();
relation.setEnvironmentCode(code);
relation.setWorkerGroupId(getWorkerGroupId(key));
relation.setWorkerGroup(key);
relation.setUpdateTime(new Date());
relation.setCreateTime(new Date());
Expand Down Expand Up @@ -418,6 +425,14 @@ private void checkUsedEnvironmentWorkerGroupRelation(Set<String> deleteKeySet,
}
}

private Integer getWorkerGroupId(String workerGroupName) {
List<WorkerGroup> workerGroups = workerGroupDao.queryWorkerGroupByName(workerGroupName);
if (CollectionUtils.isEmpty(workerGroups)) {
return null;
}
return workerGroups.get(0).getId();
}

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can workerGroupDao.queryWorkerGroupByName return list?
  2. If the worker group is not exist, need to throw exception rather than return null.
  3. We need to use batch query, rather than query single item at a for-each.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 05ef512. EnvironmentServiceImpl now parses the worker groups once and loads them with workerGroupDao.queryWorkerGroupByNames(...). Missing worker groups now throw WORKER_GROUP_NOT_EXIST instead of returning null.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed on the current branch: WorkerGroupDao/Mapper returns a list, EnvironmentServiceImpl performs a single queryWorkerGroupByNames batch lookup, maps results by name, and throws WORKER_GROUP_NOT_EXIST when any requested worker group is missing.

protected void checkParams(String name, String config, String workerGroups) {
if (StringUtils.isEmpty(name)) {
throw new ServiceException(Status.ENVIRONMENT_NAME_IS_NULL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ private void checkWorkerGroupDependencies(WorkerGroup workerGroup) {
// check if the worker group has any dependent environments
List<EnvironmentWorkerGroupRelation> environmentWorkerGroupRelations =
environmentWorkerGroupRelationMapper.selectList(new QueryWrapper<EnvironmentWorkerGroupRelation>()
.lambda().eq(EnvironmentWorkerGroupRelation::getWorkerGroup, workerGroup.getName()));
.eq("worker_group_id", workerGroup.getId())
.or(queryWrapper -> queryWrapper.isNull("worker_group_id")
.eq("worker_group", workerGroup.getName())));

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The worker group name shouldn't be deprecated.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 05ef512. The worker group dependency check now checks both worker_group_id and worker_group name, so worker_group is still treated as valid data rather than deprecated fallback only.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed on the current branch: workerGroup remains a non-deprecated compatibility field, and worker group deletion dependency checks match environment relations by worker_group_id or by worker_group name.


if (CollectionUtils.isNotEmpty(environmentWorkerGroupRelations)) {
throw new ServiceException(Status.WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.dolphinscheduler.common.enums.AuthorizationType;
import org.apache.dolphinscheduler.common.enums.UserType;
import org.apache.dolphinscheduler.common.enums.WorkflowExecutionStatus;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.entity.WorkflowInstance;
Expand All @@ -44,6 +45,7 @@
import org.apache.dolphinscheduler.registry.api.enums.RegistryNodeType;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -63,6 +65,8 @@
import org.slf4j.LoggerFactory;
import org.springframework.dao.DuplicateKeyException;

import com.baomidou.mybatisplus.core.conditions.Wrapper;

@ExtendWith(MockitoExtension.class)
@MockitoSettings(strictness = Strictness.LENIENT)
public class WorkerGroupServiceTest {
Expand Down Expand Up @@ -260,6 +264,65 @@ public void giveValidParams_whenDeleteWorkerGroupById_expectSuccess() {
assertDoesNotThrow(() -> workerGroupService.deleteWorkerGroupById(loginUser, 1));
}

@Test
public void giveEnvironmentRelationMatchedByWorkerGroupId_whenDeleteWorkerGroupById_expectEnvironmentDependency() {
User loginUser = getLoginUser();
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1);
when(workerGroupDao.queryById(1)).thenReturn(workerGroup);
when(workflowInstanceDao.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
WorkflowExecutionStatus.NOT_TERMINAL_STATES)).thenReturn(null);
when(taskDefinitionDao.queryByWorkerGroup(Mockito.any())).thenReturn(null);
when(scheduleDao.queryScheduleByWorkerGroup(Mockito.any())).thenReturn(null);

EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation();
relation.setWorkerGroupId(workerGroup.getId());
relation.setWorkerGroup("stale-" + workerGroup.getName());
when(environmentWorkerGroupRelationMapper.selectList(Mockito.any())).thenAnswer(invocation -> {
Wrapper<EnvironmentWorkerGroupRelation> wrapper = invocation.getArgument(0);
String sqlSegment = wrapper.getSqlSegment();
if (sqlSegment.contains("worker_group_id") && sqlSegment.contains("worker_group")) {
return Collections.singletonList(relation);
}
return Collections.emptyList();
});

assertThrowsServiceException(Status.WORKER_GROUP_DEPENDENT_ENVIRONMENT_EXISTS,
() -> workerGroupService.deleteWorkerGroupById(loginUser, 1));
}

@Test
public void giveEnvironmentRelationMatchedOnlyByLegacyNameWithDifferentWorkerGroupId_whenDeleteWorkerGroupById_expectSuccess() {
User loginUser = getLoginUser();
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.WORKER_GROUP, 1,
WORKER_GROUP_DELETE, baseServiceLogger)).thenReturn(true);
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.WORKER_GROUP, null, 1,
baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1);
when(workerGroupDao.queryById(1)).thenReturn(workerGroup);
when(workflowInstanceDao.queryByWorkerGroupNameAndStatus(workerGroup.getName(),
WorkflowExecutionStatus.NOT_TERMINAL_STATES)).thenReturn(null);
when(taskDefinitionDao.queryByWorkerGroup(Mockito.any())).thenReturn(null);
when(scheduleDao.queryScheduleByWorkerGroup(Mockito.any())).thenReturn(null);

EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation();
relation.setWorkerGroupId(2);
relation.setWorkerGroup(workerGroup.getName());
when(environmentWorkerGroupRelationMapper.selectList(Mockito.any())).thenAnswer(invocation -> {
Wrapper<EnvironmentWorkerGroupRelation> wrapper = invocation.getArgument(0);
String sqlSegment = wrapper.getSqlSegment();
if (sqlSegment.contains("worker_group") && !sqlSegment.contains("worker_group_id IS NULL")) {
return Collections.singletonList(relation);
}
return Collections.emptyList();
});

assertDoesNotThrow(() -> workerGroupService.deleteWorkerGroupById(loginUser, 1));
}

@Test
public void testQueryAllGroupWithDefault() {
List<String> workerGroups = workerGroupService.queryAllGroup(getLoginUser());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@
import org.apache.dolphinscheduler.dao.entity.Environment;
import org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation;
import org.apache.dolphinscheduler.dao.entity.User;
import org.apache.dolphinscheduler.dao.entity.WorkerGroup;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentMapper;
import org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper;
import org.apache.dolphinscheduler.dao.repository.TaskDefinitionDao;
import org.apache.dolphinscheduler.dao.repository.WorkerGroupDao;

import org.apache.commons.collections4.CollectionUtils;

Expand All @@ -55,6 +57,7 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.ArgumentCaptor;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.MockedStatic;
Expand Down Expand Up @@ -88,6 +91,9 @@ public class EnvironmentServiceTest {
@Mock
private TaskDefinitionDao taskDefinitionDao;

@Mock
private WorkerGroupDao workerGroupDao;

@Mock
private ResourcePermissionCheckService resourcePermissionCheckService;

Expand Down Expand Up @@ -140,6 +146,26 @@ public void testCreateEnvironment() {
}
}

@Test
public void testCreateEnvironmentWithWorkerGroupId() {
User adminUser = getAdminUser();
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.ENVIRONMENT,
adminUser.getId(), ENVIRONMENT_CREATE, baseServiceLogger)).thenReturn(true);
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.ENVIRONMENT, null,
0, baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1, "default");
when(environmentMapper.insert(any(Environment.class))).thenReturn(1);
when(workerGroupDao.queryWorkerGroupByName("default")).thenReturn(Collections.singletonList(workerGroup));
when(relationMapper.insert(any(EnvironmentWorkerGroupRelation.class))).thenReturn(1);

environmentService.createEnvironment(adminUser, "testName", "test", "test", workerGroups);

ArgumentCaptor<EnvironmentWorkerGroupRelation> relationCaptor =
ArgumentCaptor.forClass(EnvironmentWorkerGroupRelation.class);
Mockito.verify(relationMapper).insert(relationCaptor.capture());
assertEquals(workerGroup.getId(), relationCaptor.getValue().getWorkerGroupId());
}

@Test
public void testCheckParams() {
assertThrowsServiceException(Status.ENVIRONMENT_WORKER_GROUPS_IS_INVALID,
Expand Down Expand Up @@ -193,6 +219,27 @@ public void testUpdateEnvironmentByCode() {
""));
}

@Test
public void testUpdateEnvironmentByCodeWithWorkerGroupId() {
User adminUser = getAdminUser();
when(resourcePermissionCheckService.operationPermissionCheck(AuthorizationType.ENVIRONMENT,
adminUser.getId(), ENVIRONMENT_UPDATE, baseServiceLogger)).thenReturn(true);
when(resourcePermissionCheckService.resourcePermissionCheck(AuthorizationType.ENVIRONMENT, null,
0, baseServiceLogger)).thenReturn(true);
WorkerGroup workerGroup = getWorkerGroup(1, "default");
when(environmentMapper.update(any(Environment.class), any(Wrapper.class))).thenReturn(1);
when(relationMapper.queryByEnvironmentCode(1L)).thenReturn(Collections.emptyList());
when(workerGroupDao.queryWorkerGroupByName("default")).thenReturn(Collections.singletonList(workerGroup));
when(relationMapper.insert(any(EnvironmentWorkerGroupRelation.class))).thenReturn(1);

environmentService.updateEnvironmentByCode(adminUser, 1L, "testName", "test", "test", workerGroups);

ArgumentCaptor<EnvironmentWorkerGroupRelation> relationCaptor =
ArgumentCaptor.forClass(EnvironmentWorkerGroupRelation.class);
Mockito.verify(relationMapper).insert(relationCaptor.capture());
assertEquals(workerGroup.getId(), relationCaptor.getValue().getWorkerGroupId());
}

@Test
public void testQueryAllEnvironmentList() {
when(resourcePermissionCheckService.userOwnedResourceIdsAcquisition(AuthorizationType.ENVIRONMENT,
Expand Down Expand Up @@ -325,6 +372,13 @@ private EnvironmentWorkerGroupRelation getEnvironmentWorkerGroup() {
return relation;
}

private WorkerGroup getWorkerGroup(int id, String name) {
WorkerGroup workerGroup = new WorkerGroup();
workerGroup.setId(id);
workerGroup.setName(name);
return workerGroup;
}

/**
* create an environment description
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ public class EnvironmentWorkerGroupRelation {
/**
* worker group id
*/
private Integer workerGroupId;

/**
* worker group name
*/

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/**
* worker group name
*/

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in f592c03. Removed the extra worker group name comment.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed on the current branch: EnvironmentWorkerGroupRelation keeps workerGroup without a deprecated marker in that area.

private String workerGroup;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
<mapper namespace="org.apache.dolphinscheduler.dao.mapper.EnvironmentWorkerGroupRelationMapper">
<sql id="baseSql">
id, environment_code, worker_group, operator, create_time, update_time
id, environment_code, worker_group_id, worker_group, operator, create_time, update_time
</sql>
<select id="queryByEnvironmentCode" resultType="org.apache.dolphinscheduler.dao.entity.EnvironmentWorkerGroupRelation">
select
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1158,6 +1158,7 @@ CREATE TABLE t_ds_environment_worker_group_relation
(
id int NOT NULL AUTO_INCREMENT,
environment_code bigint(20) NOT NULL,
worker_group_id int DEFAULT NULL,
worker_group varchar(255) NOT NULL,
operator int DEFAULT NULL,
create_time timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1149,7 +1149,8 @@ DROP TABLE IF EXISTS `t_ds_environment_worker_group_relation`;
CREATE TABLE `t_ds_environment_worker_group_relation` (
`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',
`environment_code` bigint(20) NOT NULL COMMENT 'environment code',
`worker_group` varchar(255) NOT NULL COMMENT 'worker group id',
`worker_group_id` int(11) DEFAULT NULL COMMENT 'worker group id',
`worker_group` varchar(255) NOT NULL COMMENT 'worker group name',

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why worker_group is not null but worker_group_id can be null?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 05ef512. The base schema now defines worker_group_id as NOT NULL. For upgrades, DDL adds it nullable first, DML backfills it from t_ds_worker_group, and then the column is changed to NOT NULL for MySQL and PostgreSQL.

`operator` int(11) DEFAULT NULL COMMENT 'operator user id',
`create_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
`update_time` timestamp NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,7 @@ DROP TABLE IF EXISTS t_ds_environment_worker_group_relation;
CREATE TABLE t_ds_environment_worker_group_relation (
id serial NOT NULL,
environment_code bigint NOT NULL,
worker_group_id int DEFAULT NULL,
worker_group varchar(255) NOT NULL,
operator int DEFAULT NULL,
create_time timestamp DEFAULT NULL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,7 @@
*/

ALTER TABLE `t_ds_serial_command`
MODIFY COLUMN `workflow_definition_code` BIGINT(20) NOT NULL COMMENT 'workflow definition code';
MODIFY COLUMN `workflow_definition_code` BIGINT(20) NOT NULL COMMENT 'workflow definition code';

ALTER TABLE `t_ds_environment_worker_group_relation`
ADD COLUMN `worker_group_id` int(11) DEFAULT NULL COMMENT 'worker group id';

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should put it into 3.5.0

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 05ef512. I moved the worker_group_id upgrade scripts from 3.4.1_schema to 3.5.0_schema for both MySQL and PostgreSQL. The 3.4.1 upgrade scripts are no longer changed in this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed on the current branch: the MySQL worker_group_id upgrade DDL/DML is under sql/upgrade/3.5.0_schema, and the 3.4.1 MySQL DDL no longer contains this change.

Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

UPDATE `t_ds_environment_worker_group_relation` ewgr
INNER JOIN `t_ds_worker_group` wg ON ewgr.`worker_group` = wg.`name`
SET ewgr.`worker_group_id` = wg.`id`
WHERE ewgr.`worker_group_id` IS NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@
CREATE SEQUENCE IF NOT EXISTS t_ds_task_instance_context_id_seq;
ALTER TABLE t_ds_task_instance_context
ALTER COLUMN id SET DEFAULT nextval('t_ds_task_instance_context_id_seq'::regclass);

ALTER TABLE t_ds_environment_worker_group_relation ADD COLUMN IF NOT EXISTS worker_group_id int DEFAULT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

UPDATE t_ds_environment_worker_group_relation ewgr
SET worker_group_id = wg.id
FROM t_ds_worker_group wg
WHERE ewgr.worker_group_id IS NULL
AND ewgr.worker_group = wg.name;
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ private EnvironmentWorkerGroupRelation insertOne() {
// insertOne
EnvironmentWorkerGroupRelation relation = new EnvironmentWorkerGroupRelation();
relation.setEnvironmentCode(1L);
relation.setWorkerGroupId(1);
relation.setWorkerGroup("default");
relation.setOperator(1);
relation.setUpdateTime(new Date());
Expand All @@ -76,6 +77,7 @@ public void testQuery() {
// query
List<EnvironmentWorkerGroupRelation> relations = environmentWorkerGroupRelationMapper.selectList(null);
Assertions.assertEquals(relations.size(), 1);
Assertions.assertEquals(1, relations.get(0).getWorkerGroupId());
}

@Test
Expand Down
Loading