EventsBag support for archive.SaveSplit method

Hello @Eugene.Shashkov
Thank you for your response, looking forward to have this functionality available for use.

Small clarification for point 2 - in our case (and overall for better flexibility) it would be helpful to have ability to both open the stream just in time for it to be used, as well as close it right when it’s not needed anymore.

Hello @shtaff

  1. EventsBag for multi-volume archive is implemented.
  2. Entry accessed event is introduced.

Hello @Eugene.Shashkov
Thank a lot, we will work on adopting this when time permits.

Hello @Eugene.Shashkov
We have integrated the new version and the changes mostly work well, but there is one issue and one additional request to consider.

Issue: The stream is immediately accessed when added to the archive - specifically CanSeek method - and that leads to the stream being open right away upon add, not during SaveSplit call.

Request: is it possible to add an ability to provide “Stream Open Delegate” instead of a straight up stream, so the delegate is called to retrieve a stream during save method so the stream is altogether created right before it is used?

Please consider these and let us know ib these modifications are possible.

Thank you in advance!

Hello @shtaff,
Introducing such delegate is possible and makes sense.

It is correct that CreateEntry method checks if the stream is seekable and accesses CanSeek property. Does your stream being open during that?

Below is the scenario which EntryAccessed is dedicated for:

using (var archive = new Archive())
{
    archive.CreateEntry("first", s1);
    archive.CreateEntry("second", s2);
    ...

    EventsBag eb = new EventsBag();
    eb.EntryAccessed += delegate (object sender, EntryEventArgs args)
    {
        if (args.Entry.Name.Equals("first"))
           Open(s1);
        else if (args.Entry.Name.Equals("second"))
           Open(s2);
         ...
    };

    archive.SaveSplit("dir", new SplitArchiveSaveOptions("part", 65536) { EventsBag = eb });
}

Is it correct that your streams become opened before EntryAccessed invocation?

That is correct, our stream opens when it is used in any way, including CanSeek property, because of the nature of our underlying store.
This specific makes me to ask you if it is possible to either defer the CanSeek call, or allow to specify some kind of a delegate that spawns a stream for an entry, so that delegate is invoked right before the stream is supposed to be read.

@shtaff
We have opened the following new ticket(s) in our internal issue tracking system and will deliver their fixes according to the terms mentioned in Free Support Policies.

Issue ID(s): ZIPNET-1198

You can obtain Paid Support Services if you need support on a priority basis, along with the direct access to our Paid Support management team.

Hello @Eugene.Shashkov,

We have tested the Aspose.ZIP under higher load and it seem that we will not be able to use your library without just-in-time stream creation and immediate deallocation after the usage.

Currently we have to create and pass all streams at the time of Entry creation, and each stream has internal buffer of 1 MB (not configurable), and all those streams stay in memory. This creates significant memory load when we are trying to package a ZIP with thousands of entries.

The only way I can think of is to be able to pass a stream open delegate that would allocate a stream just before the entry is written, and then the stream should be disposed and released immediately after it is used. This is more or less in line with the issue that you have created based on our previous request.
Otherwise buffers will build up in memory and may lead to OutOfMemoryException in large data scenarios.

Do you, by any chance, have any tentative ETA on when you may have that request completed so we can understand whether we can wait for the functionality from your side or if we should seek for alternative libraries to use?

Thank you in advance.

Hello @shtaff ,
Please try just published version 24.9.

There is a new entry composition method CreateEntry(string name, System.Func<Stream> streamProvider, ArchiveEntrySettings newEntrySettings = null). It is available for product versions for .NET Framework 4.0+ and .NET Standard 2.0 version.
Sample usage:

System.Func<Stream> provider = delegate(){ return new MemoryStream(); };
using (FileStream zipFile = File.Open("archive.zip", FileMode.Create))
{
    using (var archive = new Archive())
     {
           archive.CreateEntry("entry1.bin", provider)); 
           archive.Save(zipFile);
     }
 }

Have read thoroughly your scenario.
It would be somewhat better in your case to have delegate with Func<in ArchiveEntry, out Stream> signature. However even with parameter-less delegate your target can be achieved using events and closures.

Thank you @Eugene.Shashkov, we will take a look at the new version soon, and will get back to you with the results.

As per your last comment: closures will do a better job for us than trying to match source data by entry, since the only data that is present on the entry is it’s name, and we may have hard time matching it back to the source. Our entry names are combination of several properties on the Source item so keeping this mapping will require some sort of a dictionary, so there will be a closure anyway.
But overall having an entry as an input parameter for the streamProvider delegate may give better flexibility.

@Eugene.Shashkov we have checked the new implementation and from the interface perspective it looks good, but it seems that it still has memory consumption issue because the streams are not closed by your code right after entry write completed.

Please see the example below. We are running a .NET 8 console app here.
From what we see when we run this - all streams are opened upon save, but stay in memory until the archive itself is closed.

using Aspose.Zip;

var license = new License();
license.SetLicense(@"C:\Temp\Aspose.Total.NET.lic");

var rnd = new Random();

using (FileStream zipFile = File.Open("archive.zip", FileMode.Create))
{
    using (var archive = new Archive())
    {
        foreach (var i in Enumerable.Range(1, 100))
        {
            archive.CreateEntry($"entry{i}.bin", () =>
            {
                byte[] data = new byte[1024 * 1024];
                rnd.NextBytes(data);
                return new ConsoleLoggedMemoryStream(i, data);
            }); 
        }
        Console.WriteLine("End adding entries");
        archive.Save(zipFile); // streams are opened byt not closed
        Console.WriteLine("End save");
        GC.Collect(); // stream memory is NOT released here, even though streams are not really needed anymore
        Console.WriteLine("End GC collect 1");
        await Task.Delay(1000);
    } // streams are only closed here
    Console.WriteLine("End archive dispose");
}
Console.WriteLine("End file stream dispose");

GC.Collect(); // stream memory is released here
Console.WriteLine("End GC collect 2");

await Task.Delay(1000);

Console.WriteLine("All Done"); 
Console.ReadKey();

public class ConsoleLoggedMemoryStream : MemoryStream
{
    private readonly int _number;

    public ConsoleLoggedMemoryStream(int number, byte[] data) : base(data)
    {
        _number = number; 
        Console.WriteLine("Stream {0} opening", _number);
    }

    public override void Close()
    {
        Console.WriteLine("Stream {0} closing", _number);
        base.Close();
    }
}

The expectation was that the library would only keep actively used streams in memory, and will release as soon as entry write is complete.
The output of the above sample app will be more like

Stream 1 opening
Stream 1 closing
Stream 2 opening
Stream 2 closing

and the code inside archive.Save method would be something high level like

foreach (var entry in archive.Entries)
{
    using (var entryStream = entry.StreamProvider())
    {
        // write the entry data all the way
    }
}

Can you please take a look and let us know if this makes sense?

Your sample does not use events and closures, let me rewrite it:

using System.Reflection;
using System.Text.RegularExpressions;
using Aspose.Zip;
using Aspose.Zip.Saving;

var license = new License(); license.SetLicense(@"Aspose.ZIP.NET.lic");
const int N = 5000;
var rnd = new Random();
Regex numberExtractor = new Regex(@"^entry(?<number>\d+)\.bin$", RegexOptions.Compiled);

using (FileStream zipFile = File.Open("archive.zip", FileMode.Create))
{
    using (var archive = new Archive(new ArchiveEntrySettings(CompressionSettings.Store)))
    {
        ConsoleLoggedMemoryStream[] sourceStreams = new ConsoleLoggedMemoryStream[N];
        for (int i = 0; i < N; i++)
        {
            int i1 = i;
            archive.CreateEntry($"entry{i}.bin", () =>
            {
                byte[] data = new byte[1024 * 1024];
                rnd.NextBytes(data);
                return sourceStreams[i1] = new ConsoleLoggedMemoryStream(i1, data);
            });
        }

        Console.WriteLine("End adding entries");
        
        int currentlyCompressedStreamNumber = -1;
        
        ArchiveSaveOptions saveOptions = new ArchiveSaveOptions { EventsBag = new EventsBag() };
        saveOptions.EventsBag.EntryAccessed += delegate(object? sender, EntryEventArgs eventArgs)
        {
            Console.WriteLine($"Starting {eventArgs.Entry.Name} compression"); 
            // In single tread there is an order.
            currentlyCompressedStreamNumber = int.Parse(numberExtractor.Match(eventArgs.Entry.Name).Groups["number"].Value);
            //Or just currentlyCompressedStreamNumber++ - it is ok with such entries.
        };
        saveOptions.EventsBag.EntryCompressed += delegate(object? sender, CancelEntryEventArgs eventArgs)
        {
            Console.WriteLine($"Entry {eventArgs.Entry.Name} proceeded"); 
            sourceStreams[currentlyCompressedStreamNumber].Close();
        };
        
        archive.Save(zipFile, saveOptions); 
        Console.WriteLine("End save");
        GC.Collect(); 
        // stream memory is NOT released here, even though streams are not really needed anymore - still true without hack
        Console.WriteLine("End GC collect 1");

    } // streams are only closed here - now twice, first time was within EntryCompressed 
    Console.WriteLine("End archive dispose");
}
Console.WriteLine("End file stream dispose");
GC.Collect(); // stream memory is released here
Console.WriteLine("End GC collect 2");
Console.WriteLine("All Done");

public class ConsoleLoggedMemoryStream : MemoryStream
{
    private readonly int _number;
    private static FieldInfo bufferField;

    static ConsoleLoggedMemoryStream()
    {
        bufferField = typeof(MemoryStream).GetField("_buffer", BindingFlags.NonPublic | BindingFlags.Instance);
    }
    public ConsoleLoggedMemoryStream(int number, byte[] data) : base(data, 0, data.Length, true, true)
    {
        _number = number;
        Console.WriteLine("Stream {0} opening", _number);
    }
    public override void Close()
    {
        Console.WriteLine("Stream {0} closing", _number);

        //DIRTY HACK - release internal buffer of MemoryStream
        //bufferField.SetValue(this, null);

        base.Close();
    }
}

All reasoning below assume you compress in single thread.

First, you need to keep all references to actual streams, array sourceStreams stores them.
So, in the EntryAccessed event handler you define which stream is being compressed, it’s sourceStreams[currentlyCompressedStreamNumber]. Right after the entry compression is done you can close the stream voluntary in the EntryCompressed event handler.

It is correct that entry sources are not disposed within archive.Save at least because it is possible to add more entries and save the archive twice.
__
I do not know when your production stream actually releases the memory but in this sample with MemoryStream descendant the memory does not released on ConsoleLoggedMemoryStream.Close. This happens because MemoryStream works so. So, the LOH grows to several gigabytes in my sample with 5000 entries.

To adjust the sample to consume little memory we need to force internal memory release on ConsoleLoggedMemoryStream.Close. There is dirty hack is introduced to clean up the memory. Timely cleaned, the LOH is restricted to several megabytes. This mostly proves your target is feasible.
__
BTW, have you played with multi-threaded ZIP composition? If all your input data has the same order of magnitude and the RAM is big enough you can benefit a lot. It’ll require dodgy stream management but still seems feasible.

Hello @Eugene.Shashkov
Thank you for your response.

You are correct that my example does not use closures or events, I was trying to demonstrate core issue with the memory consumption of Achive.
Expectation was that since stream is allocated within Save, it will be released (both stream.Close/Dispose called and stream reference released) within Save as well, and there will be kind of a using statement. Without that, memory pressure point is just moved later, from CreateEntry phase to Save phase.

I understand the possibility of multiple Save calls on same Archive instance, but in that case second Save call could, or maybe even should, call streamProvider delegate for the second time and get another instance of the stream with same content. That creates a requirement to streamProvider delegate to be idempotent, but that makes sense for a factory method. And that is not an issue altogether for majority of consumers who likely call save method only once.

Another example when the delegate should be called for a second time instead of trying to reuse the stream from the first call is that a stream may not be seekable so it will not be possible to use it anyway once it is read to end onece. Having a second delegate call may allow to allocate another instance of a stream at 0 position.

In the case you presented, secons Save call will likely not work since the streams are now closed, and, as you correctly mentioned, the memory is may not be fully released, depending on the implementation of stream.Close, since archive still keeps the reference to a closed stream until the archive itself Closes/Disposes.

So, overall, expectation is streams are created before entry is written, and Closed+reference released right after that, and second Save call creates a new stream with the same content by calling same delegate instance once again.
This can either be default behavior, or enabled by some kind of a parameter/option on a archive.CreateEntry/archive.Save method.

As for the multithreaded scenario, currently we are not looking to that as we are still struggling to make it work for our requirements in a simple case. We will consider going mutithreaded once single threaded case works well for us.

Please let me know if all above makes sense to you.

Hello @shtaff,

I agree that streamProvider should be invoked twice on re-save without rewinding the stream it initially returned.

I’ll keep source stream disposal as it is on archive disposal, but I’ll also expand ArchiveSaveOptions with flag indicating whether to dispose entries’ sources immediately. I hope this and EventsBag.EntryCompressed event would be enough for fine stream management.

It also may be we’ll provide read access to the stream source of the entry in order to handle it more conveniently within EntryCompressed handler or whatever place.

Hello @Eugene.Shashkov
If there will be option to dispose the stream right away after use, and then the reference to a stream instance will be released right after that, that should cover all our needs.
That will minimize the number of streams that stay in memory at the moment in time and that is exactly what we need.

Thanks a lot and looking forward for an update.

Hello,
please review the recent changes that suits your needs: CloseEntrySource and DataSource properties.

Hello,

Thank you, we will review this in a little while and will get back to you.

Thanks!

@Eugene.Shashkov I have briefly tested the CloseEntrySource setting, and the behavior is the stream is closed right away, but still not disposed and released until the Archive instance itself is disposed. This way there’s still a possibility for resource leak depending on stream implementation, for example if any internal buffers are not dereferenced upon close (e.g. MemoryStream.Close does not dereference it’s buffer, and it’s memory only released when the stream instance is collected by GC).

This implementation should be good enough in our case though.
Worst case we can wrap the stream into a lightweight wrapper that would fully dispose and dereference the wrapped stream upon close. In this case there will be no significant memory leak, we will just hold to a bunch of shell stream references, which is not too bad.

Thank you. We will work on integration of this into our code in some time.

You are correct, the entry stream is closed, not disposed, if CloseEntrySource flag is set.