-
Notifications
You must be signed in to change notification settings - Fork 748
[GOBBLIN-2056] initialize topology specs directly without waitging for listener call… #3937
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,15 +19,13 @@ | |
|
|
||
| import java.io.IOException; | ||
| import java.net.URI; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Map; | ||
| import java.util.Properties; | ||
|
|
||
| import org.apache.commons.lang3.StringUtils; | ||
| import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; | ||
| import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; | ||
| import org.apache.gobblin.util.reflection.GobblinConstructorUtils; | ||
| import org.quartz.CronExpression; | ||
| import org.slf4j.Logger; | ||
| import org.slf4j.LoggerFactory; | ||
|
|
@@ -48,6 +46,7 @@ | |
| import org.apache.gobblin.configuration.State; | ||
| import org.apache.gobblin.instrumented.Instrumented; | ||
| import org.apache.gobblin.metrics.MetricContext; | ||
| import org.apache.gobblin.metrics.ServiceMetricNames; | ||
| import org.apache.gobblin.metrics.Tag; | ||
| import org.apache.gobblin.runtime.api.FlowSpec; | ||
| import org.apache.gobblin.runtime.api.JobSpec; | ||
|
|
@@ -58,12 +57,14 @@ | |
| import org.apache.gobblin.runtime.job_catalog.FSJobCatalog; | ||
| import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; | ||
| import org.apache.gobblin.runtime.spec_catalog.AddSpecResponse; | ||
| import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; | ||
| import org.apache.gobblin.service.ServiceConfigKeys; | ||
| import org.apache.gobblin.metrics.ServiceMetricNames; | ||
| import org.apache.gobblin.service.modules.flowgraph.Dag; | ||
| import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; | ||
| import org.apache.gobblin.service.modules.spec.JobExecutionPlan; | ||
| import org.apache.gobblin.util.ConfigUtils; | ||
| import org.apache.gobblin.util.PropertiesUtils; | ||
| import org.apache.gobblin.util.reflection.GobblinConstructorUtils; | ||
|
|
||
|
|
||
| // Provide base implementation for constructing multi-hops route. | ||
|
|
@@ -73,8 +74,7 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { | |
| // Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected | ||
| // to these data structures. | ||
| @Getter | ||
| @Setter | ||
| protected final Map<URI, TopologySpec> topologySpecMap; | ||
| protected final Map<URI, TopologySpec> topologySpecMap = Maps.newConcurrentMap(); | ||
|
|
||
| protected final Config config; | ||
| protected final Logger log; | ||
|
|
@@ -97,35 +97,13 @@ public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { | |
|
|
||
| private Optional<UserQuotaManager> userQuotaManager; | ||
|
|
||
| public BaseFlowToJobSpecCompiler(Config config){ | ||
| this(config,true); | ||
| } | ||
|
|
||
| public BaseFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled){ | ||
| this(config, Optional.<Logger>absent(), true); | ||
| } | ||
|
|
||
| public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log){ | ||
| this(config, log,true); | ||
| } | ||
|
|
||
| public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled){ | ||
| this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); | ||
| if (instrumentationEnabled) { | ||
| this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class); | ||
| this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER)); | ||
| this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER)); | ||
| this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER)); | ||
| this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER)); | ||
| } | ||
| else { | ||
| this.metricContext = null; | ||
| this.flowCompilationSuccessFulMeter = Optional.absent(); | ||
| this.flowCompilationFailedMeter = Optional.absent(); | ||
| this.flowCompilationTimer = Optional.absent(); | ||
| this.dataAuthorizationTimer = Optional.absent(); | ||
| } | ||
|
|
||
| public BaseFlowToJobSpecCompiler(Config config, Collection<TopologySpec> topologySpecSet){ | ||
| this.log = LoggerFactory.getLogger(getClass()); | ||
| this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class); | ||
| this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER)); | ||
| this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER)); | ||
| this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER)); | ||
| this.dataAuthorizationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.DATA_AUTHORIZATION_TIMER)); | ||
| this.warmStandbyEnabled = ConfigUtils.getBoolean(config, ServiceConfigKeys.GOBBLIN_SERVICE_WARM_STANDBY_ENABLED_KEY, false); | ||
| if (this.warmStandbyEnabled) { | ||
| userQuotaManager = Optional.of(GobblinConstructorUtils.invokeConstructor(UserQuotaManager.class, | ||
|
|
@@ -134,10 +112,12 @@ public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean in | |
| userQuotaManager = Optional.absent(); | ||
| } | ||
|
|
||
| this.topologySpecMap = Maps.newConcurrentMap(); | ||
| topologySpecSet.forEach(this::onAddTopologySpec); | ||
|
|
||
|
|
||
| this.config = config; | ||
|
|
||
| /*** | ||
| /* | ||
| * ETL-5996 | ||
|
||
| * For multi-tenancy, the following needs to be added: | ||
| * 1. Change singular templateCatalog to Map<URI, JobCatalogWithTemplates> to support multiple templateCatalogs | ||
|
|
@@ -219,8 +199,6 @@ private AddSpecResponse onAddFlowSpec(FlowSpec flowSpec) { | |
| public AddSpecResponse onAddSpec(Spec addedSpec) { | ||
| if (addedSpec instanceof FlowSpec) { | ||
| return onAddFlowSpec((FlowSpec) addedSpec); | ||
| } else if (addedSpec instanceof TopologySpec) { | ||
| return onAddTopologySpec( (TopologySpec) addedSpec); | ||
|
Comment on lines
-222
to
-223
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is it wise to remove the ability to be a TS listener?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. being a TS listener feels of no use to me. There should be no TS listener in gaas.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I equate topology specs w/ executors in the flow graph. since the FG can change dynamically w/o requiring a system restart, it doesn't seem out of the question to change the set of topos w/o a system restart either, given whatever new FG edges could indicate newly defined I agree that's not what we've done thus far, but it's arguably inconvenient to require two separate changes to define a new executor - one to the FG and one to the gaas configs. if it were possible to do both together, I would personally find that appealing |
||
| } | ||
| return new AddSpecResponse(null); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -43,6 +43,7 @@ | |||||
| import org.apache.gobblin.service.modules.orchestration.TimingEventUtils; | ||||||
| import org.apache.gobblin.service.modules.orchestration.UserQuotaManager; | ||||||
| import org.apache.gobblin.service.modules.spec.JobExecutionPlan; | ||||||
| import org.apache.gobblin.service.modules.topology.TopologySpecFactory; | ||||||
| import org.apache.gobblin.service.monitoring.FlowStatusGenerator; | ||||||
| import org.apache.gobblin.util.ClassAliasResolver; | ||||||
| import org.apache.gobblin.util.ConfigUtils; | ||||||
|
|
@@ -73,12 +74,12 @@ public class FlowCompilationValidationHelper { | |||||
|
|
||||||
| @Inject | ||||||
| public FlowCompilationValidationHelper(Config config, SharedFlowMetricsSingleton sharedFlowMetricsSingleton, | ||||||
| UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator) { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how do we know the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TopologySpecFactory is guice based initialized
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes I see that but what is the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Checking Line 508 in 33660f5
One safe guard we can add is both Line 522 in 33660f5
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SpecCompiler gets information about topology specs through orchestrator's onAddSpec. And orchestrator gets info about topology specs because it is a listener on topology spec catalog. |
||||||
| UserQuotaManager userQuotaManager, FlowStatusGenerator flowStatusGenerator, TopologySpecFactory topologySpecFactory) { | ||||||
| try { | ||||||
| String specCompilerClassName = ConfigUtils.getString(config, ServiceConfigKeys.GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY, | ||||||
| ServiceConfigKeys.DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS); | ||||||
| this.specCompiler = (SpecCompiler) ConstructorUtils.invokeConstructor(Class.forName( | ||||||
| new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config); | ||||||
| new ClassAliasResolver<>(SpecCompiler.class).resolve(specCompilerClassName)), config, topologySpecFactory.getTopologies()); | ||||||
| } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException | | ||||||
| ClassNotFoundException e) { | ||||||
| throw new RuntimeException(e); | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,6 +21,7 @@ | |
| import java.net.URI; | ||
| import java.net.URISyntaxException; | ||
| import java.util.Collection; | ||
| import java.util.Collections; | ||
| import java.util.List; | ||
| import java.util.Properties; | ||
|
|
||
|
|
@@ -53,6 +54,7 @@ | |
| import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; | ||
| import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; | ||
| import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; | ||
| import org.apache.gobblin.service.modules.topology.TopologySpecFactory; | ||
| import org.apache.gobblin.service.modules.utils.FlowCompilationValidationHelper; | ||
| import org.apache.gobblin.service.modules.utils.SharedFlowMetricsSingleton; | ||
| import org.apache.gobblin.service.monitoring.FlowStatusGenerator; | ||
|
|
@@ -61,6 +63,7 @@ | |
|
|
||
| import static org.mockito.ArgumentMatchers.anyMap; | ||
| import static org.mockito.Mockito.doNothing; | ||
| import static org.mockito.Mockito.doReturn; | ||
| import static org.mockito.Mockito.mock; | ||
|
|
||
|
|
||
|
|
@@ -92,6 +95,8 @@ public void setup() throws Exception { | |
| cleanUpDir(FLOW_SPEC_STORE_DIR); | ||
|
|
||
| Properties orchestratorProperties = new Properties(); | ||
| // Create Spec to play with | ||
| this.topologySpec = initTopologySpec(); | ||
|
|
||
| Properties topologyProperties = new Properties(); | ||
| topologyProperties.put("specStore.fs.dir", TOPOLOGY_SPEC_STORE_DIR); | ||
|
|
@@ -114,6 +119,8 @@ public void setup() throws Exception { | |
| FlowLaunchHandler mockFlowTriggerHandler = mock(FlowLaunchHandler.class); | ||
| DagManager mockDagManager = mock(DagManager.class); | ||
| doNothing().when(mockDagManager).setTopologySpecMap(anyMap()); | ||
| TopologySpecFactory mockedTopologySpecFactory = mock(TopologySpecFactory.class); | ||
| doReturn(Collections.singleton(this.topologySpec)).when(mockedTopologySpecFactory).getTopologies(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. prefer the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer this way, because in
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. up to you. I do agree with a spy that |
||
| Config config = ConfigBuilder.create() | ||
| .addPrimitive(MostlyMySqlDagManagementStateStore.DAG_STATESTORE_CLASS_KEY, | ||
| MostlyMySqlDagManagementStateStoreTest.TestMysqlDagStateStore.class.getName()) | ||
|
|
@@ -130,13 +137,12 @@ public void setup() throws Exception { | |
| this.orchestrator = new Orchestrator(ConfigUtils.propertiesToConfig(orchestratorProperties), | ||
| this.topologyCatalog, mockDagManager, Optional.of(logger), mockStatusGenerator, | ||
| Optional.of(mockFlowTriggerHandler), sharedFlowMetricsSingleton, Optional.of(mock(FlowCatalog.class)), Optional.of(dagManagementStateStore), | ||
| new FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator)); | ||
| new FlowCompilationValidationHelper(config, sharedFlowMetricsSingleton, mock(UserQuotaManager.class), mockStatusGenerator, | ||
| mockedTopologySpecFactory)); | ||
| this.topologyCatalog.addListener(orchestrator); | ||
| this.flowCatalog.addListener(orchestrator); | ||
| // Start application | ||
| this.serviceLauncher.start(); | ||
| // Create Spec to play with | ||
| this.topologySpec = initTopologySpec(); | ||
| this.flowSpec = initFlowSpec(); | ||
| } | ||
|
|
||
|
|
@@ -244,9 +250,10 @@ public void createTopologySpec() { | |
| } | ||
| // Make sure TopologyCatalog is empty | ||
| Assert.assertTrue(specs.size() == 0, "Spec store should be empty before addition"); | ||
| // Make sure TopologyCatalog Listener is empty | ||
| Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 0, "SpecCompiler should not know about any Topology " | ||
| + "before addition"); | ||
| Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1, "SpecCompiler should know about any Topology " | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. desc here makes it sound like the test should be
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "SpecCompiler should know about any Topology irrespective of what is there in the topology catalog" ? how does that sound like test should be There is no precondition. SpecCompiler should just know about the topologies. Number of topologies known to spec compiler should just always be 1 (equals to total number of topologies).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "know about any" suggested it would apply even if there were > 1 topo specs. couldn't there be more than one? also, if non-empty |
||
| + " irrespective of what is there in the topology catalog"); | ||
| // Make sure TopologyCatalog empty | ||
| Assert.assertTrue(this.topologyCatalog.getSize() == 0, "Topology catalog should contain 0 Spec before addition"); | ||
|
Comment on lines
+255
to
+256
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. seems this asserts that the spec compiler's topos may now potentially deviate from the topo catalog's, which might make the system harder to reason about... is it really a good thing to drop such an invariant?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is a deviation only till everything is initialized. after that, they both should be same (1 in this test).
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. prior to this change, there's an "inconsistent timespan", where the compiler doesn't have topo specs initialized. you suggested this might compromise correctness of after this change, sounds like there will be an "inconsistent timespan", where the |
||
|
|
||
| // Create and add Spec | ||
| this.topologyCatalog.put(topologySpec); | ||
|
|
@@ -262,7 +269,7 @@ public void createTopologySpec() { | |
| // Make sure TopologyCatalog has the added Topology | ||
| Assert.assertTrue(specs.size() == 1, "Spec store should contain 1 Spec after addition"); | ||
| // Make sure TopologyCatalog Listener knows about added Topology | ||
| Assert.assertTrue(specCompiler.getTopologySpecMap().size() == 1, "SpecCompiler should contain 1 Spec after addition"); | ||
| Assert.assertTrue(this.topologyCatalog.getSize() == 1, "Topology catalog should contain 1 Spec after addition"); | ||
| } | ||
|
|
||
| @Test (dependsOnMethods = "createTopologySpec") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the intention here to remove the use of this variable for checking if instrumentation is enabled? It's still used widely in GaaS modules so there should be justification
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Arjun did some refactoring before which made instrumentation required I believe. Can you link that change to explain what's happening here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
truein the above above constructor anyway.