1 回答

TA貢獻2011條經驗 獲得超2個贊
您的基本問題是您將所有壓力圖樣本保存在內存中,而不是單獨編寫每個樣本,然后允許對其進行垃圾收集。更糟糕的是,您在兩個不同的地方執行此操作:
json在將字符串寫入文件之前 ,您將整個樣本列表序列化為 JSON 字符串。
相反,正如性能提示:優化內存使用中所述,在這種情況下,您應該直接對文件進行序列化和反序列化。有關如何執行此操作的說明,請參閱Can Json.NET serialize / deserialize to / from a stream? 的答案?并將JSON 序列化為文件。
recordedData.pressureData = new List<PressureMap>();累積所有壓力圖樣本,然后在每次制作樣本時將它們全部寫入。
更好的解決方案是將每個樣本編寫一次并忘記它,但是每個樣本都嵌套在 JSON 中的一些容器對象中的要求使得如何做到這一點并不明顯。
那么,如何解決問題#2?
首先,讓我們如下修改您的數據模型,將標頭數據劃分為一個單獨的類:
public class PressureMap
{
public double[,] PressureMatrix { get; set; }
}
public class CalibrationConfiguration
{
// Data model not included in question
}
public class RepresentationConfiguration
{
// Data model not included in question
}
public class RecordedDataHeader
{
public string SoftwareVersion { get; set; }
public CalibrationConfiguration CalibrationConfiguration { get; set; }
public RepresentationConfiguration RepresentationConfiguration { get; set; }
}
public class RecordedData
{
// Ensure the header is serialized first.
[JsonProperty(Order = 1)]
public RecordedDataHeader RecordedDataHeader { get; set; }
// Ensure the pressure data is serialized last.
[JsonProperty(Order = 2)]
public IEnumerable<PressureMap> PressureData { get; set; }
}
選項 #1 是生產者-消費者模式的一個版本。它涉及啟動兩個線程:一個用于生成PressureData樣本,一個用于序列化RecordedData. 第一個線程將生成樣本并將它們添加到BlockingCollection<PressureMap>傳遞給第二個線程的集合中。然后第二個線程將序列BlockingCollection<PressureMap>.GetConsumingEnumerable() 化為 的值RecordedData.PressureData。
以下代碼給出了如何執行此操作的框架:
var sampleCount = 400; // Or whatever stopping criterion you prefer
var sampleInterval = 10; // in ms
using (var pressureData = new BlockingCollection<PressureMap>())
{
// Adapted from
// https://docs.microsoft.com/en-us/dotnet/standard/collections/thread-safe/blockingcollection-overview
// https://docs.microsoft.com/en-us/dotnet/api/system.collections.concurrent.blockingcollection-1?view=netframework-4.7.2
// Spin up a Task to sample the pressure maps
using (Task t1 = Task.Factory.StartNew(() =>
{
for (int i = 0; i < sampleCount; i++)
{
var data = GetPressureMap(i);
Console.WriteLine("Generated sample {0}", i);
pressureData.Add(data);
System.Threading.Thread.Sleep(sampleInterval);
}
pressureData.CompleteAdding();
}))
{
// Spin up a Task to consume the BlockingCollection
using (Task t2 = Task.Factory.StartNew(() =>
{
var recordedDataHeader = new RecordedDataHeader
{
SoftwareVersion = softwareVersion,
CalibrationConfiguration = calibrationConfiguration,
RepresentationConfiguration = representationConfiguration,
};
var settings = new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
};
using (var stream = new FileStream(this.filePath, FileMode.Create))
using (var textWriter = new StreamWriter(stream))
using (var jsonWriter = new JsonTextWriter(textWriter))
{
int j = 0;
var query = pressureData
.GetConsumingEnumerable()
.Select(p =>
{
// Flush the writer periodically in case the process terminates abnormally
jsonWriter.Flush();
Console.WriteLine("Serializing item {0}", j++);
return p;
});
var recordedData = new RecordedData
{
RecordedDataHeader = recordedDataHeader,
// Since PressureData is declared as IEnumerable<PressureMap>, evaluation will be lazy.
PressureData = query,
};
Console.WriteLine("Beginning serialization of {0} to {1}:", recordedData, this.filePath);
JsonSerializer.CreateDefault(settings).Serialize(textWriter, recordedData);
Console.WriteLine("Finished serialization of {0} to {1}.", recordedData, this.filePath);
}
}))
{
Task.WaitAll(t1, t2);
}
}
}
筆記:
該解決方案使用的事實是,在序列化.NET 時
IEnumerable<T>
,Json.NET不會將枚舉物化為列表。相反,它將充分利用惰性評估并簡單地枚舉它,寫入然后忘記遇到的每個單獨的項目。第一個線程采樣
PressureData
并將它們添加到阻塞集合中。第二個線程將阻塞集合包裝在 an
IEnumerable<PressureData>
然后將其序列化為RecordedData.PressureData
.在序列化過程中,序列化程序將通過
IEnumerable<PressureData>
枚舉枚舉,將每個流式傳輸到 JSON 文件,然后繼續進行下一個 - 有效地阻塞,直到一個可用。您將需要做一些實驗以確保序列化線程可以“跟上”采樣線程,可能通過
BoundedCapacity
在構造期間設置 a。如果沒有,您可能需要采用不同的策略。PressureMap GetPressureMap(int count)
應該是您的某種方法(問題中未顯示),它返回當前壓力圖樣本。在這種技術中,JSON 文件在采樣會話期間保持打開狀態。如果采樣異常終止,文件可能會被截斷。我嘗試通過定期刷新寫入器來改善問題。
雖然數據序列化將不再需要無限量的內存,但反序列化
RecordedData
稍后會將PressureData
數組反序列化為具體的List<PressureMap>
. 這可能會導致下游處理期間出現內存問題。
演示小提琴#1在這里。
選項 #2是從 JSON 文件切換到換行符分隔的 JSON文件。這樣的文件由由換行符分隔的 JSON 對象序列組成。在您的情況下,您將使第一個對象包含RecordedDataHeader
信息,而后續對象的類型為PressureMap
:
var sampleCount = 100; // Or whatever
var sampleInterval = 10;
var recordedDataHeader = new RecordedDataHeader
{
SoftwareVersion = softwareVersion,
CalibrationConfiguration = calibrationConfiguration,
RepresentationConfiguration = representationConfiguration,
};
var settings = new JsonSerializerSettings
{
ContractResolver = new CamelCasePropertyNamesContractResolver(),
};
// Write the header
Console.WriteLine("Beginning serialization of sample data to {0}.", this.filePath);
using (var stream = new FileStream(this.filePath, FileMode.Create))
{
JsonExtensions.ToNewlineDelimitedJson(stream, new[] { recordedDataHeader });
}
// Write each sample incrementally
for (int i = 0; i < sampleCount; i++)
{
Thread.Sleep(sampleInterval);
Console.WriteLine("Performing sample {0} of {1}", i, sampleCount);
var map = GetPressureMap(i);
using (var stream = new FileStream(this.filePath, FileMode.Append))
{
JsonExtensions.ToNewlineDelimitedJson(stream, new[] { map });
}
}
Console.WriteLine("Finished serialization of sample data to {0}.", this.filePath);
使用擴展方法:
public static partial class JsonExtensions
{
// Adapted from the answer to
// https://stackoverflow.com/questions/44787652/serialize-as-ndjson-using-json-net
// by dbc https://stackoverflow.com/users/3744182/dbc
public static void ToNewlineDelimitedJson<T>(Stream stream, IEnumerable<T> items)
{
// Let caller dispose the underlying stream
using (var textWriter = new StreamWriter(stream, new UTF8Encoding(false, true), 1024, true))
{
ToNewlineDelimitedJson(textWriter, items);
}
}
public static void ToNewlineDelimitedJson<T>(TextWriter textWriter, IEnumerable<T> items)
{
var serializer = JsonSerializer.CreateDefault();
foreach (var item in items)
{
// Formatting.None is the default; I set it here for clarity.
using (var writer = new JsonTextWriter(textWriter) { Formatting = Formatting.None, CloseOutput = false })
{
serializer.Serialize(writer, item);
}
// http://specs.okfnlabs.org/ndjson/
// Each JSON text MUST conform to the [RFC7159] standard and MUST be written to the stream followed by the newline character \n (0x0A).
// The newline charater MAY be preceeded by a carriage return \r (0x0D). The JSON texts MUST NOT contain newlines or carriage returns.
textWriter.Write("\n");
}
}
// Adapted from the answer to
// https://stackoverflow.com/questions/29729063/line-delimited-json-serializing-and-de-serializing
// by Yuval Itzchakov https://stackoverflow.com/users/1870803/yuval-itzchakov
public static IEnumerable<TBase> FromNewlineDelimitedJson<TBase, THeader, TRow>(TextReader reader)
where THeader : TBase
where TRow : TBase
{
bool first = true;
using (var jsonReader = new JsonTextReader(reader) { CloseInput = false, SupportMultipleContent = true })
{
var serializer = JsonSerializer.CreateDefault();
while (jsonReader.Read())
{
if (jsonReader.TokenType == JsonToken.Comment)
continue;
if (first)
{
yield return serializer.Deserialize<THeader>(jsonReader);
first = false;
}
else
{
yield return serializer.Deserialize<TRow>(jsonReader);
}
}
}
}
}
稍后,您可以按如下方式處理換行符分隔的 JSON 文件:
using (var stream = File.OpenRead(filePath))
using (var textReader = new StreamReader(stream))
{
foreach (var obj in JsonExtensions.FromNewlineDelimitedJson<object, RecordedDataHeader, PressureMap>(textReader))
{
if (obj is RecordedDataHeader)
{
var header = (RecordedDataHeader)obj;
// Process the header
Console.WriteLine(JsonConvert.SerializeObject(header));
}
else
{
var row = (PressureMap)obj;
// Process the row.
Console.WriteLine(JsonConvert.SerializeObject(row));
}
}
}
筆記:
這種方法看起來更簡單,因為樣本是以增量方式添加到文件末尾的,而不是插入到某個整體 JSON 容器中。
使用這種方法,序列化和下游處理都可以通過有限的內存使用來完成。
樣本文件在采樣期間不會保持打開狀態,因此不太可能被截斷。
下游應用程序可能沒有用于處理換行分隔 JSON 的內置工具。
這種策略可以更簡單地與您當前的線程代碼集成。
- 1 回答
- 0 關注
- 374 瀏覽
添加回答
舉報