-
Notifications
You must be signed in to change notification settings - Fork 96
[REEF-1978] Adding Checkpoint handler for IMRU master #1429
base: master
Are you sure you want to change the base?
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,8 +20,10 @@ | |
using System.Globalization; | ||
using System.IO; | ||
using System.Linq; | ||
using Newtonsoft.Json; | ||
using Org.Apache.REEF.Common.Tasks; | ||
using Org.Apache.REEF.IMRU.API; | ||
using Org.Apache.REEF.IMRU.OnREEF.CheckpointHandler; | ||
using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; | ||
using Org.Apache.REEF.IMRU.OnREEF.Parameters; | ||
using Org.Apache.REEF.Tang.Annotations; | ||
|
@@ -30,6 +32,7 @@ | |
using Org.Apache.REEF.Tang.Interface; | ||
using Org.Apache.REEF.Tang.Util; | ||
using Org.Apache.REEF.Utilities.Logging; | ||
using Org.Apache.REEF.Wake.Remote; | ||
|
||
namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce | ||
{ | ||
|
@@ -69,6 +72,8 @@ protected override IMRUJobDefinitionBuilder CreateJobDefinitionBuilder(int numbe | |
.SetMapInputPipelineDataConverterConfiguration(MapInputDataConverterConfig(chunkSize)) | ||
.SetMapOutputPipelineDataConverterConfiguration(MapOutputDataConverterConfig(chunkSize)) | ||
.SetPartitionedDatasetConfiguration(PartitionedDatasetConfiguration(numberofMappers)) | ||
.SetResultHandlerConfiguration(BuildResultHandlerConfig()) | ||
.SetCheckpointConfiguration(BuildCheckpointConfig()) | ||
.SetJobName("BroadcastReduce") | ||
.SetNumberOfMappers(numberofMappers) | ||
.SetMapperMemory(mapperMemory) | ||
|
@@ -108,6 +113,19 @@ protected override IConfiguration BuildUpdateFunctionConfig() | |
GenericType<BroadcastSenderReduceReceiverUpdateFunctionFT>.Class).Build(); | ||
} | ||
|
||
/// <summary> | ||
/// Build checkpoint configuration. Subclass can override it. | ||
/// </summary> | ||
protected override IConfiguration BuildCheckpointConfig() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above. It is a sample client class, mainly contains driver configurations. I will change the class into internal. |
||
{ | ||
var filePath = Path.Combine(Path.GetTempPath(), Guid.NewGuid() + "state.txt"); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please use the temp files generated by REEF, not There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
return CheckpointConfigurationModule.ConfigurationModule | ||
.Set(CheckpointConfigurationModule.CheckpointFile, filePath) | ||
.Set(CheckpointConfigurationModule.TaskStateCodec, GenericType<UpdateTaskStateCodec>.Class) | ||
.Build(); | ||
} | ||
|
||
/// <summary> | ||
/// Configuration for Update task state | ||
/// </summary> | ||
|
@@ -290,12 +308,17 @@ internal sealed class BroadcastSenderReduceReceiverUpdateFunctionFT : IUpdateFun | |
private readonly int[] _intArr; | ||
private readonly int _workers; | ||
private readonly UpdateTaskState<int[], int[]> _taskState; | ||
private readonly IIMRUCheckpointHandler _stateHandler; | ||
private readonly ICodec<ITaskState> _stateCodec; | ||
|
||
[Inject] | ||
private BroadcastSenderReduceReceiverUpdateFunctionFT( | ||
[Parameter(typeof(BroadcastReduceConfiguration.NumberOfIterations))] int maxIters, | ||
[Parameter(typeof(BroadcastReduceConfiguration.Dimensions))] int dim, | ||
[Parameter(typeof(BroadcastReduceConfiguration.NumWorkers))] int numWorkers, | ||
[Parameter(typeof(TaskConfigurationOptions.Identifier))] string taskId, | ||
IIMRUCheckpointHandler stateHandler, | ||
ICodec<ITaskState> stateCodec, | ||
ITaskState taskState) | ||
{ | ||
_maxIters = maxIters; | ||
|
@@ -304,6 +327,12 @@ private BroadcastSenderReduceReceiverUpdateFunctionFT( | |
_intArr = new int[_dim]; | ||
_workers = numWorkers; | ||
_taskState = (UpdateTaskState<int[], int[]>)taskState; | ||
|
||
_stateHandler = stateHandler; | ||
_stateCodec = stateCodec; | ||
|
||
int retryNumber; | ||
int.TryParse(taskId[taskId.Length - 1].ToString(), out retryNumber); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -343,6 +372,8 @@ UpdateResult<int[], int[]> IUpdateFunction<int[], int[], int[]>.Update(int[] inp | |
/// <returns>Map input</returns> | ||
UpdateResult<int[], int[]> IUpdateFunction<int[], int[], int[]>.Initialize() | ||
{ | ||
RestoreState(); | ||
|
||
if (_taskState.Result != null) | ||
{ | ||
Restore(_taskState.Result); | ||
|
@@ -372,7 +403,7 @@ private void SaveState(int[] value) | |
{ | ||
_taskState.Iterations = _iterations; | ||
_taskState.Input = value; | ||
Logger.Log(Level.Info, "State saved: {0}", _taskState.Input[0]); | ||
PersistState(); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -383,7 +414,7 @@ private void SaveResult(int[] value) | |
{ | ||
_taskState.Iterations = _iterations; | ||
_taskState.Result = value; | ||
Logger.Log(Level.Info, "Result saved: {0}", _taskState.Result[0]); | ||
PersistState(); | ||
} | ||
|
||
/// <summary> | ||
|
@@ -397,6 +428,29 @@ private void Restore(int[] d) | |
_intArr[i] = d[i]; | ||
} | ||
} | ||
|
||
private void PersistState() | ||
{ | ||
Logger.Log(Level.Info, "$$$$$$$$$$$ State to save: {0}", _taskState.Input[0]); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please reformat all log lines and consider moving them to more fine grained log levels. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah |
||
Logger.Log(Level.Info, "$$$$$$$$$$$ SaveState:currentState: {0}", JsonConvert.SerializeObject(_taskState)); | ||
|
||
_stateHandler.Persistent(_taskState, _stateCodec); | ||
} | ||
|
||
private void RestoreState() | ||
{ | ||
var obj = (UpdateTaskState<int[], int[]>)_stateHandler.Restore(_stateCodec); | ||
|
||
if (obj != null) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. obj is used to update the current state in momer If null, that means the checkpoint handler is not able to get any old state for whatever reason, then the current state in the memory keeps the same. |
||
{ | ||
Logger.Log(Level.Info, | ||
"$$$$$$$$$$$ restoreState:DeserializeObject: input: {0}, iteration: {1}, result: {2}.", | ||
obj.Input == null ? string.Empty : string.Join(",", obj.Input), | ||
obj.Iterations, | ||
obj.Result == null ? string.Empty : string.Join(",", obj.Result)); | ||
_taskState.Update(obj); | ||
} | ||
} | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
// Licensed to the Apache Software Foundation (ASF) under one | ||
// or more contributor license agreements. See the NOTICE file | ||
// distributed with this work for additional information | ||
// regarding copyright ownership. The ASF licenses this file | ||
// to you under the Apache License, Version 2.0 (the | ||
// "License"); you may not use this file except in compliance | ||
// with the License. You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, | ||
// software distributed under the License is distributed on an | ||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
// KIND, either express or implied. See the License for the | ||
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
using Newtonsoft.Json; | ||
using Org.Apache.REEF.IMRU.OnREEF.IMRUTasks; | ||
using Org.Apache.REEF.Tang.Annotations; | ||
using Org.Apache.REEF.Utilities; | ||
using Org.Apache.REEF.Wake.Remote; | ||
|
||
namespace Org.Apache.REEF.IMRU.Examples.PipelinedBroadcastReduce | ||
{ | ||
public class UpdateTaskStateCodec : ICodec<ITaskState> | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
{ | ||
[Inject] | ||
UpdateTaskStateCodec() | ||
{ | ||
} | ||
|
||
public ITaskState Decode(byte[] data) | ||
{ | ||
var str = ByteUtilities.ByteArraysToString(data); | ||
return JsonConvert.DeserializeObject<UpdateTaskState<int[], int[]>>(str); | ||
} | ||
|
||
public byte[] Encode(ITaskState taskState) | ||
{ | ||
var state = JsonConvert.SerializeObject(taskState); | ||
return ByteUtilities.StringToByteArrays(state); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer not to use
public abstract
classes as APIs. Consider re-structuring this using composition instead of inheritance.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is to follow the existing pattern for getting configurations. It was to let client to share the same CreateJobDefinitionBuilder but have its own way to override the configuration. If we really want to change it, it needs to do in different PR as the change must be consistent cross other methods.