Streaming NSXMLParser with NSInputStream
Asked Answered
F

2

12

Update:

When using NSXMLParser class method initWithContentsOfURL, rather than parsing as the XML feed is downloaded, it appears to try to load the entire XML file into memory, and only then initiate the parsing process. This is problematic if the XML feed is large (using an excessive amount of RAM, inherently inefficient because rather than parsing in parallel with the download, it only starts the parsing once the download is done, etc.).

Has anyone discovered how to parse as the feed is being streamed to the device using NSXMLParser? Yes, you can use LibXML2 (as discussed below), but it seems like it should be possible to do it with NSXMLParser. But it's eluding me.

Original question:

I was wrestling with using NSXMLParser to read XML from a web stream. If you use initWithContentsOfURL, while the interface may lead one to infer that it would stream the XML from the web, it doesn't seem to to do so, but rather appears to attempt to load the entire XML file first before any parsing taking place. For modest sized XML files that's fine, but for really large ones, that's problematic.

I have seen discussions of using NSXMLParser in conjunction with initWithStream with some customized NSInputStream that is streaming from the web. For example, there have been answers to this that suggest using something like the CFStreamCreateBoundPair referred to in the following Cocoa Builder post and the discussion of Setting Up Socket Streams in the Apple Stream Programming Guide, but I have not gotten it to work. I even tried writing my own subclassed NSInputStream that used a NSURLConnection (which is, itself, pretty good at streaming) but I wasn't able to get it to work in conjunction with NSXMLParser.

In the end, I decided to use LibXML2 rather than NSXMLParser, as demonstrated in the Apple XMLPerformance sample, but I was wondering if anyone had any luck getting streaming from a web source working with NSXMLParser. I've seen plenty of "theoretically you could do x" sort of answers, suggesting everything from CFStreamCreateBoundPair to grabbing the HTTPBodyStream from NSURLRequest, but I've yet to come across a working demonstration of streaming with NSXMLParser.

The Ray Wenderlich article How To Choose The Best XML Parser for Your iPhone Project seems to confirm that NSXMLParser is not well suited for large XML files, but with all of the posts about possible NSXMLParser-based work-arounds for streaming really large XML files, I'm surprised I have yet to find a working demonstration of this. Does anyone know of a functioning NSXMLParser implementation that streams from the web? Clearly, I can just stick with LibXML2 or some other equivalent XML parser, but the notion of streaming with NSXMLParser seems tantilizingly close.

Frons answered 6/12, 2012 at 9:2 Comment(0)
A
6

-[NSXMLParser initWithStream:] is the only interface to NSXMLParser that currently performs a streaming parse of the data. Hooking it up to an asynchronous NSURLConnection that's providing data incrementally is unwieldy because NSXMLParser takes a blocking, "pull"-based approach to reading from the NSInputStream. That is, -[NSXMLParser parse] does something like the following when dealing with an NSInputStream:

while (1) {
    NSInteger length = [stream read:buffer maxLength:maxLength];
    if (!length)
        break;

    // Parse data …
}

In order to incrementally provide data to this parser a custom NSInputStream subclass is needed that funnels data received by the NSURLConnectionDelegate calls on a background queue or runloop over to the -read:maxLength: call that NSXMLParser is waiting on.

A proof-of-concept implementation follows:

#include <Foundation/Foundation.h>

@interface ReceivedDataStream : NSInputStream <NSURLConnectionDelegate>
@property (retain) NSURLConnection *connection;
@property (retain) NSMutableArray *bufferedData;
@property (assign, getter=isFinished) BOOL finished;
@property (retain) dispatch_semaphore_t semaphore;
@end

@implementation ReceivedDataStream

- (id)initWithContentsOfURL:(NSURL *)url
{
    if (!(self = [super init]))
        return nil;

    NSURLRequest *request = [NSURLRequest requestWithURL:url];
    self.connection = [[[NSURLConnection alloc] initWithRequest:request delegate:self startImmediately:NO] autorelease];
    self.connection.delegateQueue = [[[NSOperationQueue alloc] init] autorelease];
    self.bufferedData = [NSMutableArray array];
    self.semaphore = dispatch_semaphore_create(0);

    return self;
}

- (void)dealloc
{
    self.connection = nil;
    self.bufferedData = nil;
    self.semaphore = nil;

    [super dealloc];
}

- (BOOL)hasBufferedData
{
    @synchronized (self) { return self.bufferedData.count > 0; }
}

#pragma mark - NSInputStream overrides

- (void)open
{
    NSLog(@"open");
    [self.connection start];
}

- (void)close
{
    NSLog(@"close");
    [self.connection cancel];
}

- (NSInteger)read:(uint8_t *)buffer maxLength:(NSUInteger)maxLength
{
    NSLog(@"read:%p maxLength:%ld", buffer, maxLength);
    if (self.isFinished && !self.hasBufferedData)
        return 0;

    if (!self.hasBufferedData)
        dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER);

    NSAssert(self.isFinished || self.hasBufferedData, @"Was woken without new information");

    if (self.isFinished && !self.hasBufferedData)
        return 0;

    NSData *data = nil;
    @synchronized (self) {
        data = [[self.bufferedData[0] retain] autorelease];
        [self.bufferedData removeObjectAtIndex:0];
        if (data.length > maxLength) {
            NSData *remainingData = [NSData dataWithBytes:data.bytes + maxLength length:data.length - maxLength];
            [self.bufferedData insertObject:remainingData atIndex:0];
        }
    }

    NSUInteger copiedLength = MIN([data length], maxLength);
    memcpy(buffer, [data bytes], copiedLength);
    return copiedLength;
}


#pragma mark - NSURLConnetionDelegate methods

- (void)connection:(NSURLConnection *)connection didReceiveData:(NSData *)data
{
    NSLog(@"connection:%@ didReceiveData:…", connection);
    @synchronized (self) {
        [self.bufferedData addObject:data];
    }
    dispatch_semaphore_signal(self.semaphore);
}

- (void)connectionDidFinishLoading:(NSURLConnection *)connection
{
    NSLog(@"connectionDidFinishLoading:%@", connection);
    self.finished = YES;
    dispatch_semaphore_signal(self.semaphore);
}

@end

@interface ParserDelegate : NSObject <NSXMLParserDelegate>
@end

@implementation ParserDelegate

- (void)parser:(NSXMLParser *)parser didStartElement:(NSString *)elementName namespaceURI:(NSString *)namespaceURI qualifiedName:(NSString *)qualifiedName attributes:(NSDictionary *)attributeDict
{
    NSLog(@"parser:%@ didStartElement:%@ namespaceURI:%@ qualifiedName:%@ attributes:%@", parser, elementName, namespaceURI, qualifiedName, attributeDict);
}

- (void)parserDidEndDocument:(NSXMLParser *)parser
{
    NSLog(@"parserDidEndDocument:%@", parser);
    CFRunLoopStop(CFRunLoopGetCurrent());
}

@end


int main(int argc, char **argv)
{
    @autoreleasepool {

        NSURL *url = [NSURL URLWithString:@"http://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xml"];
        ReceivedDataStream *stream = [[ReceivedDataStream alloc] initWithContentsOfURL:url];
        NSXMLParser *parser = [[NSXMLParser alloc] initWithStream:stream];
        parser.delegate = [[[ParserDelegate alloc] init] autorelease];

        [parser performSelector:@selector(parse) withObject:nil afterDelay:0.0];

        CFRunLoopRun();

    }
    return 0;
}
Alius answered 26/1, 2013 at 20:6 Comment(7)
Thanks for the reply. When I tried setting up a NSURLConnection as a stream (following the examples I referenced in my original question), I never got it to work. I see lots of posts and articles with hypothetical "well, you should be able to do X", but I never found a working example, and I never was able to get one working myself. Do you know of any working examples that marries initWithStream with NSURLConnection?Frons
It seems like a an NSURLConnection delegate could feed the data it receives to an instance of an NSInputStream subclass. I'll throw something together to confirm it works.Alius
It's not quite as easy as it first seems because NSXMLParser reads from the stream without spinning the runloop any, so you have to configure the NSURLConnection to send the delegate messages to a secondary thread or dispatch queue. Once that realization is made it is a simple matter of typing though :)Alius
Oh, and I'd strongly suggest filing an enhancement request at bugreport.apple.com asking for a less cumbersome API for this.Alius
First, thanks again for your help here. Second, there were a few minor issues with your above code sample, which I think I've remedied in my answer below. I'm going to leave your answer as "accepted", because you're the one that really solved the issue for me and pointed me in the right direction. Thanks again!Frons
I'd suggest that proposing an edit to the answer would be less confusing.Alius
Excellent. I'd therefore recommend an additional edit to this code to fix the semaphore bug. This code only works if read:maxLength: is always waiting for a semaphore by the time that didReceiveData sends the semaphore. But if didReceiveData fills bufferedData and sends the semaphore before read:maxLength: calls dispatch_semaphore_wait, then read:maxLength: will not remove the semaphore and on the next call of read:maxLength, when it goes to wait for the semaphore, the previous signal is sitting there and this code then crashes.Frons
U
1

I noticed that bdash's answer used NSURLConnection. But according to the NSURLConnection Documentation

This API is considered legacy. Use NSURLSession instead.

so I replaced it with NSURLSessionDataTask.

#import <Foundation/Foundation.h>
#import <objc/objc-sync.h>

@interface RemoteInputStream : NSInputStream
+ (instancetype)new NS_UNAVAILABLE;
+ (instancetype)inputStreamWithData:(NSData *)data NS_UNAVAILABLE;
+ (instancetype)inputStreamWithFileAtPath:(NSString *)path NS_UNAVAILABLE;
- (instancetype)init NS_UNAVAILABLE;
- (instancetype)initWithData:(NSData *)data NS_UNAVAILABLE;
- (instancetype)initWithFileAtPath:(NSString *)path NS_UNAVAILABLE;

+ (instancetype)inputStreamWithRequest:(NSURLRequest *)request;
- (instancetype)initWithRequest:(NSURLRequest *)request NS_DESIGNATED_INITIALIZER;
@end

@interface RemoteInputStream () <NSURLSessionDataDelegate>
@property (retain) NSURLSessionDataTask *sessionDataTask;
@property (retain) NSMutableArray<NSData *> *bufferData;
@property (retain, nullable) dispatch_semaphore_t semaphore;
@end

@implementation RemoteInputStream

+ (instancetype)inputStreamWithRequest:(NSURLRequest *)request {
    return [[[self.class alloc] initWithRequest:request] autorelease];
}

- (instancetype)initWithURL:(NSURL *)url {
    NSURLRequest *request = [[NSURLRequest alloc] initWithURL:url];
    self = [self initWithRequest:request];
    [request release];
    return self;
}

- (instancetype)initWithRequest:(NSURLRequest *)request {
    if (self = [super initWithURL:request.URL]) {
        NSURLSession *session = [NSURLSession sessionWithConfiguration:NSURLSessionConfiguration.ephemeralSessionConfiguration];
        NSURLSessionDataTask *sessionDataTask = [session dataTaskWithRequest:request];
        self.sessionDataTask = sessionDataTask;
        
        NSMutableArray<NSData *> *bufferData = [NSMutableArray<NSData *> new];
        self.bufferData = bufferData;
        [bufferData release];
    }
    
    return self;
}

- (void)dealloc {
    [_sessionDataTask cancel];
    [_sessionDataTask release];
    [_bufferData release];
    
    if (_semaphore) {
        dispatch_release(_semaphore);
    }
    
    [super dealloc];
}

- (void)open {
    self.sessionDataTask.delegate = self;
    [self.sessionDataTask resume];
}

- (void)close {
    [self.sessionDataTask suspend];
}

- (NSInteger)read:(uint8_t *)buffer maxLength:(NSUInteger)len {
    objc_sync_enter(self);
    
    if (self.bufferData.count == 0) {
        if (self.sessionDataTask.state == NSURLSessionTaskStateRunning) {
            dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
            self.semaphore = semaphore;
            
            objc_sync_exit(self);
            
            dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
            objc_sync_enter(self);
            
            self.semaphore = nil;
            dispatch_release(semaphore);
            
            if (self.bufferData.count == 0) {
                objc_sync_exit(self);
                return 0;
            }
        } else {
            objc_sync_exit(self);
            return 0;
        }
    }
    
    NSMutableData *result = [NSMutableData new];
    NSUInteger remaining = len;
    
    while (YES) {
        NSAutoreleasePool *pool = [NSAutoreleasePool new];
        
        BOOL shouldBreak;
        
        if (remaining < self.bufferData[0].length) {
            NSData *data1 = [self.bufferData[0] subdataWithRange:NSMakeRange(0, remaining)];
            NSData *data2 = [self.bufferData[0] subdataWithRange:NSMakeRange(remaining, self.bufferData[0].length - remaining)];
            
            [result appendData:data1];
            [self.bufferData replaceObjectAtIndex:0 withObject:data2];
            remaining = 0;
            shouldBreak = YES;
        } else {
            [result appendData:self.bufferData[0]];
            remaining -= self.bufferData[0].length;
            [self.bufferData removeObjectAtIndex:0];
            
            if (self.bufferData.count == 0) {
                shouldBreak = YES;
            } else {
                shouldBreak = NO;
            }
        }
        
        [pool release];
        
        if (remaining == 0) {
            shouldBreak = YES;
        }
        
        if (shouldBreak) {
            break;
        }
    }
    
    objc_sync_exit(self);
    
    NSUInteger length = result.length;
    
    memcpy(buffer, result.bytes, length);
    [result release];
    
    return length;
}

#pragma mark - NSURLSessionDataDelegate

- (void)URLSession:(NSURLSession *)session dataTask:(NSURLSessionDataTask *)dataTask didReceiveData:(NSData *)data {
    objc_sync_enter(self);
    [self.bufferData addObject:data];
    
    if (self.semaphore) {
        dispatch_semaphore_signal(self.semaphore);
    }
    
    objc_sync_exit(self);
}

@end

An example of unit test code:

#import <XCTest/XCTestCase.h>

@interface RemoteInputStreamTests : XCTestCase
@end

@implementation RemoteInputStreamTests

- (void)test_read {
    NSURL *testURL = [NSURL URLWithString:@"https://fastly.picsum.photos/id/11/2500/1667.jpg?hmac=xxjFJtAPgshYkysU_aqx2sZir-kIOjNR9vx0te7GycQ"];
    NSData *normalData = [NSData dataWithContentsOfURL:testURL];
    
    RemoteInputStream *inputStream = [RemoteInputStream inputStreamWithURL:testURL];
    [inputStream open];
    
    NSUInteger maxLength = 16;
    uint8_t *buffer = malloc(sizeof(uint8_t) * maxLength);
    NSUInteger len = [inputStream read:buffer maxLength:maxLength];
    NSMutableData *streamingData = [NSMutableData new];
    
    while (len) {
        [streamingData appendBytes:buffer length:len];
        len = [inputStream read:buffer maxLength:maxLength];
    }
    
    free(buffer);
    
    XCTAssertTrue([normalData isEqualToData:streamingData]);
    
    [streamingData release];
}

@end
Unstressed answered 10/6, 2023 at 15:49 Comment(2)
I agree re NSURLSession but I might be wary of NSURLSessionDataTask. I would be inclined to use NSURLSessionDownloadTask (which avoids loading the asset in memory) and then do a buffered stream from the file. You really don’t want to block within a NSURLSessionDataTask.Frons
And, in defense of bdash’s use of NSURLConnection, his answer predated NSURLSession. 😉Frons

© 2022 - 2024 — McMap. All rights reserved.