EventsBag support for archive.SaveSplit method

Hello,

We are trying to adopt Aspoze.ZIP for our functionality of creating multivolume archives.
One of prolems we have is we are packaging relatively large archives with large number of entries from streams, and we would like to be able to better control when the streams close, not to keep entry streams open for extended periods of time.
One remark that our streams are coming from the data source that does not provide length so we cannot use entry.CompressionProgressed for the purpose.

And in current contracts, archive.Save method accepts an instance of ArchiveSaveOptions, that, among others, accepts EventsBag instance that allows to capture an event of entry compressing, where we can close the source stream.
But archive.SaveSplit method does not accept EventsBag in any way.

With the above said, I have two questions.

  1. Is it possible/planned to add support of EventsBag to SaveSplit method, or alternatively expose more events on the Entry level to signal the full processing of the entry?
  2. Is it possible/planned to add additional events to EventsBag to know when entry is starting to be written, or maybe expose some sort of OpenImmediately flag on the Entry creation for streams for better control of when streams are opening?

Thank you in advance.

Can you please clarify the specific version of Aspose.ZIP you are using and provide more details about the functionality you are trying to achieve with the SaveSplit method?

We are using Aspose.Zip of most recent version (24.7.0)

The exact functionality is to create large multivolume archives based on the data from a set of streams.
Streams are ‘expensive’ to keep open for a long period of time, and no not report the length in order to be able to use ArchiveEntry.CompressionProgressed event for the purpose.

So we need a way to know for sure that Aspose is

  • done working with the entry stream
  • when Aspose is about to start working with the stream (ideally)

We need this in order to reduce the time the stream is open but idling.

Hello @shtaff,

  1. Your suggestion seems reasonable and worthy to implement, so yes, we’ll expand SplitArchiveSaveOptions with EventsBag or similar.

  2. Such reporting is cogent as well. If an entry source is FileInfo or file path, user can manage if it opens immediately or later within archive.Save method. If I’ve understood you right, you need similar feature for any stream to open just in time.

Hope you’ll receive both of these features in upcoming version.

@shtaff
We have opened the following new ticket(s) in our internal issue tracking system and will deliver their fixes according to the terms mentioned in Free Support Policies.

Issue ID(s): ZIPNET-1189,ZIPNET-1190

You can obtain Paid Support Services if you need support on a priority basis, along with the direct access to our Paid Support management team.

Hello @Eugene.Shashkov
Thank you for your response, looking forward to have this functionality available for use.

Small clarification for point 2 - in our case (and overall for better flexibility) it would be helpful to have ability to both open the stream just in time for it to be used, as well as close it right when it’s not needed anymore.

Hello @shtaff

  1. EventsBag for multi-volume archive is implemented.
  2. Entry accessed event is introduced.

Hello @Eugene.Shashkov
Thank a lot, we will work on adopting this when time permits.

Hello @Eugene.Shashkov
We have integrated the new version and the changes mostly work well, but there is one issue and one additional request to consider.

Issue: The stream is immediately accessed when added to the archive - specifically CanSeek method - and that leads to the stream being open right away upon add, not during SaveSplit call.

Request: is it possible to add an ability to provide “Stream Open Delegate” instead of a straight up stream, so the delegate is called to retrieve a stream during save method so the stream is altogether created right before it is used?

Please consider these and let us know ib these modifications are possible.

Thank you in advance!

Hello @shtaff,
Introducing such delegate is possible and makes sense.

It is correct that CreateEntry method checks if the stream is seekable and accesses CanSeek property. Does your stream being open during that?

Below is the scenario which EntryAccessed is dedicated for:

using (var archive = new Archive())
{
    archive.CreateEntry("first", s1);
    archive.CreateEntry("second", s2);
    ...

    EventsBag eb = new EventsBag();
    eb.EntryAccessed += delegate (object sender, EntryEventArgs args)
    {
        if (args.Entry.Name.Equals("first"))
           Open(s1);
        else if (args.Entry.Name.Equals("second"))
           Open(s2);
         ...
    };

    archive.SaveSplit("dir", new SplitArchiveSaveOptions("part", 65536) { EventsBag = eb });
}

Is it correct that your streams become opened before EntryAccessed invocation?

That is correct, our stream opens when it is used in any way, including CanSeek property, because of the nature of our underlying store.
This specific makes me to ask you if it is possible to either defer the CanSeek call, or allow to specify some kind of a delegate that spawns a stream for an entry, so that delegate is invoked right before the stream is supposed to be read.

@shtaff
We have opened the following new ticket(s) in our internal issue tracking system and will deliver their fixes according to the terms mentioned in Free Support Policies.

Issue ID(s): ZIPNET-1198

You can obtain Paid Support Services if you need support on a priority basis, along with the direct access to our Paid Support management team.

Hello @Eugene.Shashkov,

We have tested the Aspose.ZIP under higher load and it seem that we will not be able to use your library without just-in-time stream creation and immediate deallocation after the usage.

Currently we have to create and pass all streams at the time of Entry creation, and each stream has internal buffer of 1 MB (not configurable), and all those streams stay in memory. This creates significant memory load when we are trying to package a ZIP with thousands of entries.

The only way I can think of is to be able to pass a stream open delegate that would allocate a stream just before the entry is written, and then the stream should be disposed and released immediately after it is used. This is more or less in line with the issue that you have created based on our previous request.
Otherwise buffers will build up in memory and may lead to OutOfMemoryException in large data scenarios.

Do you, by any chance, have any tentative ETA on when you may have that request completed so we can understand whether we can wait for the functionality from your side or if we should seek for alternative libraries to use?

Thank you in advance.

Hello @shtaff ,
Please try just published version 24.9.

There is a new entry composition method CreateEntry(string name, System.Func<Stream> streamProvider, ArchiveEntrySettings newEntrySettings = null). It is available for product versions for .NET Framework 4.0+ and .NET Standard 2.0 version.
Sample usage:

System.Func<Stream> provider = delegate(){ return new MemoryStream(); };
using (FileStream zipFile = File.Open("archive.zip", FileMode.Create))
{
    using (var archive = new Archive())
     {
           archive.CreateEntry("entry1.bin", provider)); 
           archive.Save(zipFile);
     }
 }

Have read thoroughly your scenario.
It would be somewhat better in your case to have delegate with Func<in ArchiveEntry, out Stream> signature. However even with parameter-less delegate your target can be achieved using events and closures.

Thank you @Eugene.Shashkov, we will take a look at the new version soon, and will get back to you with the results.

As per your last comment: closures will do a better job for us than trying to match source data by entry, since the only data that is present on the entry is it’s name, and we may have hard time matching it back to the source. Our entry names are combination of several properties on the Source item so keeping this mapping will require some sort of a dictionary, so there will be a closure anyway.
But overall having an entry as an input parameter for the streamProvider delegate may give better flexibility.

@Eugene.Shashkov we have checked the new implementation and from the interface perspective it looks good, but it seems that it still has memory consumption issue because the streams are not closed by your code right after entry write completed.

Please see the example below. We are running a .NET 8 console app here.
From what we see when we run this - all streams are opened upon save, but stay in memory until the archive itself is closed.

using Aspose.Zip;

var license = new License();
license.SetLicense(@"C:\Temp\Aspose.Total.NET.lic");

var rnd = new Random();

using (FileStream zipFile = File.Open("archive.zip", FileMode.Create))
{
    using (var archive = new Archive())
    {
        foreach (var i in Enumerable.Range(1, 100))
        {
            archive.CreateEntry($"entry{i}.bin", () =>
            {
                byte[] data = new byte[1024 * 1024];
                rnd.NextBytes(data);
                return new ConsoleLoggedMemoryStream(i, data);
            }); 
        }
        Console.WriteLine("End adding entries");
        archive.Save(zipFile); // streams are opened byt not closed
        Console.WriteLine("End save");
        GC.Collect(); // stream memory is NOT released here, even though streams are not really needed anymore
        Console.WriteLine("End GC collect 1");
        await Task.Delay(1000);
    } // streams are only closed here
    Console.WriteLine("End archive dispose");
}
Console.WriteLine("End file stream dispose");

GC.Collect(); // stream memory is released here
Console.WriteLine("End GC collect 2");

await Task.Delay(1000);

Console.WriteLine("All Done"); 
Console.ReadKey();

public class ConsoleLoggedMemoryStream : MemoryStream
{
    private readonly int _number;

    public ConsoleLoggedMemoryStream(int number, byte[] data) : base(data)
    {
        _number = number; 
        Console.WriteLine("Stream {0} opening", _number);
    }

    public override void Close()
    {
        Console.WriteLine("Stream {0} closing", _number);
        base.Close();
    }
}

The expectation was that the library would only keep actively used streams in memory, and will release as soon as entry write is complete.
The output of the above sample app will be more like

Stream 1 opening
Stream 1 closing
Stream 2 opening
Stream 2 closing

and the code inside archive.Save method would be something high level like

foreach (var entry in archive.Entries)
{
    using (var entryStream = entry.StreamProvider())
    {
        // write the entry data all the way
    }
}

Can you please take a look and let us know if this makes sense?

Your sample does not use events and closures, let me rewrite it:

using System.Reflection;
using System.Text.RegularExpressions;
using Aspose.Zip;
using Aspose.Zip.Saving;

var license = new License(); license.SetLicense(@"Aspose.ZIP.NET.lic");
const int N = 5000;
var rnd = new Random();
Regex numberExtractor = new Regex(@"^entry(?<number>\d+)\.bin$", RegexOptions.Compiled);

using (FileStream zipFile = File.Open("archive.zip", FileMode.Create))
{
    using (var archive = new Archive(new ArchiveEntrySettings(CompressionSettings.Store)))
    {
        ConsoleLoggedMemoryStream[] sourceStreams = new ConsoleLoggedMemoryStream[N];
        for (int i = 0; i < N; i++)
        {
            int i1 = i;
            archive.CreateEntry($"entry{i}.bin", () =>
            {
                byte[] data = new byte[1024 * 1024];
                rnd.NextBytes(data);
                return sourceStreams[i1] = new ConsoleLoggedMemoryStream(i1, data);
            });
        }

        Console.WriteLine("End adding entries");
        
        int currentlyCompressedStreamNumber = -1;
        
        ArchiveSaveOptions saveOptions = new ArchiveSaveOptions { EventsBag = new EventsBag() };
        saveOptions.EventsBag.EntryAccessed += delegate(object? sender, EntryEventArgs eventArgs)
        {
            Console.WriteLine($"Starting {eventArgs.Entry.Name} compression"); 
            // In single tread there is an order.
            currentlyCompressedStreamNumber = int.Parse(numberExtractor.Match(eventArgs.Entry.Name).Groups["number"].Value);
            //Or just currentlyCompressedStreamNumber++ - it is ok with such entries.
        };
        saveOptions.EventsBag.EntryCompressed += delegate(object? sender, CancelEntryEventArgs eventArgs)
        {
            Console.WriteLine($"Entry {eventArgs.Entry.Name} proceeded"); 
            sourceStreams[currentlyCompressedStreamNumber].Close();
        };
        
        archive.Save(zipFile, saveOptions); 
        Console.WriteLine("End save");
        GC.Collect(); 
        // stream memory is NOT released here, even though streams are not really needed anymore - still true without hack
        Console.WriteLine("End GC collect 1");

    } // streams are only closed here - now twice, first time was within EntryCompressed 
    Console.WriteLine("End archive dispose");
}
Console.WriteLine("End file stream dispose");
GC.Collect(); // stream memory is released here
Console.WriteLine("End GC collect 2");
Console.WriteLine("All Done");

public class ConsoleLoggedMemoryStream : MemoryStream
{
    private readonly int _number;
    private static FieldInfo bufferField;

    static ConsoleLoggedMemoryStream()
    {
        bufferField = typeof(MemoryStream).GetField("_buffer", BindingFlags.NonPublic | BindingFlags.Instance);
    }
    public ConsoleLoggedMemoryStream(int number, byte[] data) : base(data, 0, data.Length, true, true)
    {
        _number = number;
        Console.WriteLine("Stream {0} opening", _number);
    }
    public override void Close()
    {
        Console.WriteLine("Stream {0} closing", _number);

        //DIRTY HACK - release internal buffer of MemoryStream
        //bufferField.SetValue(this, null);

        base.Close();
    }
}

All reasoning below assume you compress in single thread.

First, you need to keep all references to actual streams, array sourceStreams stores them.
So, in the EntryAccessed event handler you define which stream is being compressed, it’s sourceStreams[currentlyCompressedStreamNumber]. Right after the entry compression is done you can close the stream voluntary in the EntryCompressed event handler.

It is correct that entry sources are not disposed within archive.Save at least because it is possible to add more entries and save the archive twice.
__
I do not know when your production stream actually releases the memory but in this sample with MemoryStream descendant the memory does not released on ConsoleLoggedMemoryStream.Close. This happens because MemoryStream works so. So, the LOH grows to several gigabytes in my sample with 5000 entries.

To adjust the sample to consume little memory we need to force internal memory release on ConsoleLoggedMemoryStream.Close. There is dirty hack is introduced to clean up the memory. Timely cleaned, the LOH is restricted to several megabytes. This mostly proves your target is feasible.
__
BTW, have you played with multi-threaded ZIP composition? If all your input data has the same order of magnitude and the RAM is big enough you can benefit a lot. It’ll require dodgy stream management but still seems feasible.

Hello @Eugene.Shashkov
Thank you for your response.

You are correct that my example does not use closures or events, I was trying to demonstrate core issue with the memory consumption of Achive.
Expectation was that since stream is allocated within Save, it will be released (both stream.Close/Dispose called and stream reference released) within Save as well, and there will be kind of a using statement. Without that, memory pressure point is just moved later, from CreateEntry phase to Save phase.

I understand the possibility of multiple Save calls on same Archive instance, but in that case second Save call could, or maybe even should, call streamProvider delegate for the second time and get another instance of the stream with same content. That creates a requirement to streamProvider delegate to be idempotent, but that makes sense for a factory method. And that is not an issue altogether for majority of consumers who likely call save method only once.

Another example when the delegate should be called for a second time instead of trying to reuse the stream from the first call is that a stream may not be seekable so it will not be possible to use it anyway once it is read to end onece. Having a second delegate call may allow to allocate another instance of a stream at 0 position.

In the case you presented, secons Save call will likely not work since the streams are now closed, and, as you correctly mentioned, the memory is may not be fully released, depending on the implementation of stream.Close, since archive still keeps the reference to a closed stream until the archive itself Closes/Disposes.

So, overall, expectation is streams are created before entry is written, and Closed+reference released right after that, and second Save call creates a new stream with the same content by calling same delegate instance once again.
This can either be default behavior, or enabled by some kind of a parameter/option on a archive.CreateEntry/archive.Save method.

As for the multithreaded scenario, currently we are not looking to that as we are still struggling to make it work for our requirements in a simple case. We will consider going mutithreaded once single threaded case works well for us.

Please let me know if all above makes sense to you.

Hello @shtaff,

I agree that streamProvider should be invoked twice on re-save without rewinding the stream it initially returned.

I’ll keep source stream disposal as it is on archive disposal, but I’ll also expand ArchiveSaveOptions with flag indicating whether to dispose entries’ sources immediately. I hope this and EventsBag.EntryCompressed event would be enough for fine stream management.

It also may be we’ll provide read access to the stream source of the entry in order to handle it more conveniently within EntryCompressed handler or whatever place.