使用NSInputStream进行流式NSXMLParser

12

更新:

当使用NSXMLParser类方法initWithContentsOfURL时,它似乎会尝试将整个XML文件加载到内存中,然后才启动解析过程,而不是在下载XML feed时进行解析。如果XML feed很大(使用了过多的RAM,本质上是低效的,因为它只有在下载完成后才开始解析,而没有与下载并行解析等),这是有问题的。

有人发现如何使用NSXMLParser在feed被流式传输到设备时解析吗?是的,你可以使用LibXML2(如下所讨论),但似乎应该可以使用NSXMLParser来做到。但我找不到方法。

原始问题:

我正在研究如何使用NSXMLParser从Web流中读取XML。如果您使用initWithContentsOfURL,尽管界面可能会让人推断它会从Web流中获取XML,但似乎并不是这样,而是在进行任何解析之前似乎会尝试先加载整个XML文件。对于较小的XML文件来说,这没问题,但对于非常大的文件来说,这就有问题了。

我看到有人讨论使用NSXMLParser与一些自定义的NSInputStream一起使用initWithStream,这些流从网络上进行传输。例如,有些答案建议使用类似于Cocoa Builder帖子中提到的CFStreamCreateBoundPair以及苹果公司流编程指南设置套接字流的讨论,但我没有成功。我甚至尝试编写了自己的子类化NSInputStream,使用了NSURLConnection(本身就非常擅长流式传输),但我无法将其与NSXMLParser配合使用。

最终,我决定使用LibXML2而不是NSXMLParser,就像苹果XMLPerformance 示例中所示,但我想知道是否有人成功地使用NSXMLParser从Web源进行流处理。我看到了很多“理论上你可以做x”这样的答案,建议从CFStreamCreateBoundPair到从NSURLRequest获取HTTPBodyStream等各种方法,但我还没有找到一个使用NSXMLParser进行流处理的可行演示。

Ray Wenderlich的文章如何选择最佳XML解析器以适用于您的iPhone项目似乎证实了NSXMLParser不适合大型XML文件,但是有关使用可能基于NSXMLParser的解决方法以流式传输非常大的XML文件的所有帖子中,我很惊讶地发现尚未找到可行的演示。有人知道从Web流式传输的功能NSXMLParser实现吗?显然,我可以坚持使用LibXML2或其他等效的XML解析器,但使用NSXMLParser进行流式传输的想法似乎已经非常接近。

2个回答

6

- [NSXMLParser initWithStream:]是当前执行流式解析数据的NSXMLParser的唯一接口。将其连接到提供增量数据的异步NSURLConnection上是棘手的,因为NSXMLParser采用阻塞、“拉取”式的方法从NSInputStream中读取数据。也就是说,当处理NSInputStream时,- [NSXMLParser parse]会执行以下操作:

while (1) {
    NSInteger length = [stream read:buffer maxLength:maxLength];
    if (!length)
        break;

    // Parse data …
}

为了逐步向此解析器提供数据,需要一个自定义的 NSInputStream 子类,该子类将在后台队列或运行循环上通过 NSURLConnectionDelegate 调用接收到的数据转发到 -read:maxLength: 调用中,以满足 NSXMLParser 的等待。
下面是一个概念验证实现:
#include <Foundation/Foundation.h>

@interface ReceivedDataStream : NSInputStream <NSURLConnectionDelegate>
@property (retain) NSURLConnection *connection;
@property (retain) NSMutableArray *bufferedData;
@property (assign, getter=isFinished) BOOL finished;
@property (retain) dispatch_semaphore_t semaphore;
@end

@implementation ReceivedDataStream

- (id)initWithContentsOfURL:(NSURL *)url
{
    if (!(self = [super init]))
        return nil;

    NSURLRequest *request = [NSURLRequest requestWithURL:url];
    self.connection = [[[NSURLConnection alloc] initWithRequest:request delegate:self startImmediately:NO] autorelease];
    self.connection.delegateQueue = [[[NSOperationQueue alloc] init] autorelease];
    self.bufferedData = [NSMutableArray array];
    self.semaphore = dispatch_semaphore_create(0);

    return self;
}

- (void)dealloc
{
    self.connection = nil;
    self.bufferedData = nil;
    self.semaphore = nil;

    [super dealloc];
}

- (BOOL)hasBufferedData
{
    @synchronized (self) { return self.bufferedData.count > 0; }
}

#pragma mark - NSInputStream overrides

- (void)open
{
    NSLog(@"open");
    [self.connection start];
}

- (void)close
{
    NSLog(@"close");
    [self.connection cancel];
}

- (NSInteger)read:(uint8_t *)buffer maxLength:(NSUInteger)maxLength
{
    NSLog(@"read:%p maxLength:%ld", buffer, maxLength);
    if (self.isFinished && !self.hasBufferedData)
        return 0;

    if (!self.hasBufferedData)
        dispatch_semaphore_wait(self.semaphore, DISPATCH_TIME_FOREVER);

    NSAssert(self.isFinished || self.hasBufferedData, @"Was woken without new information");

    if (self.isFinished && !self.hasBufferedData)
        return 0;

    NSData *data = nil;
    @synchronized (self) {
        data = [[self.bufferedData[0] retain] autorelease];
        [self.bufferedData removeObjectAtIndex:0];
        if (data.length > maxLength) {
            NSData *remainingData = [NSData dataWithBytes:data.bytes + maxLength length:data.length - maxLength];
            [self.bufferedData insertObject:remainingData atIndex:0];
        }
    }

    NSUInteger copiedLength = MIN([data length], maxLength);
    memcpy(buffer, [data bytes], copiedLength);
    return copiedLength;
}


#pragma mark - NSURLConnetionDelegate methods

- (void)connection:(NSURLConnection *)connection didReceiveData:(NSData *)data
{
    NSLog(@"connection:%@ didReceiveData:…", connection);
    @synchronized (self) {
        [self.bufferedData addObject:data];
    }
    dispatch_semaphore_signal(self.semaphore);
}

- (void)connectionDidFinishLoading:(NSURLConnection *)connection
{
    NSLog(@"connectionDidFinishLoading:%@", connection);
    self.finished = YES;
    dispatch_semaphore_signal(self.semaphore);
}

@end

@interface ParserDelegate : NSObject <NSXMLParserDelegate>
@end

@implementation ParserDelegate

- (void)parser:(NSXMLParser *)parser didStartElement:(NSString *)elementName namespaceURI:(NSString *)namespaceURI qualifiedName:(NSString *)qualifiedName attributes:(NSDictionary *)attributeDict
{
    NSLog(@"parser:%@ didStartElement:%@ namespaceURI:%@ qualifiedName:%@ attributes:%@", parser, elementName, namespaceURI, qualifiedName, attributeDict);
}

- (void)parserDidEndDocument:(NSXMLParser *)parser
{
    NSLog(@"parserDidEndDocument:%@", parser);
    CFRunLoopStop(CFRunLoopGetCurrent());
}

@end


int main(int argc, char **argv)
{
    @autoreleasepool {

        NSURL *url = [NSURL URLWithString:@"http://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xml"];
        ReceivedDataStream *stream = [[ReceivedDataStream alloc] initWithContentsOfURL:url];
        NSXMLParser *parser = [[NSXMLParser alloc] initWithStream:stream];
        parser.delegate = [[[ParserDelegate alloc] init] autorelease];

        [parser performSelector:@selector(parse) withObject:nil afterDelay:0.0];

        CFRunLoopRun();

    }
    return 0;
}

谢谢回复。当我尝试设置NSURLConnection作为流(遵循我在原始问题中引用的示例)时,我从未使其正常工作。我看到了很多帖子和文章,其中包含假设性的“好吧,你应该能够做X”的内容,但我从未找到一个可行的示例,并且我自己也从未能够让它正常工作。您是否知道任何将initWithStreamNSURLConnection结合使用的可行示例? - Rob
1
这并不像一开始看起来那么容易,因为 NSXMLParser 从流中读取时,并没有旋转运行循环,所以你必须配置 NSURLConnection 将委托消息发送到辅助线程或调度队列。一旦意识到了这一点,就只需要简单地打字即可 :) - bdash
哦,我强烈建议您在http://bugreport.apple.com/提交增强请求,要求提供更简便的API。 - bdash
我建议对答案进行编辑,这样会更清晰明了。 - bdash
很好。因此,我建议对此代码进行额外的编辑以修复信号量错误。如果read:maxLength:didReceiveData发送信号量时总是等待信号量,那么此代码才能正常工作。但是,如果didReceiveDataread:maxLength:调用dispatch_semaphore_wait之前填充了bufferedData并发送了信号量,则read:maxLength:将无法删除信号量,并且在下一次调用read:maxLength时,当它去等待信号量时,先前的信号量仍然存在,这段代码就会崩溃。 - Rob
显示剩余2条评论

1
我注意到bdash的回答使用了NSURLConnection。但根据NSURLConnection文档,这个API被认为是过时的。所以我将其替换为NSURLSessionDataTask。
#import <Foundation/Foundation.h>
#import <objc/objc-sync.h>

@interface RemoteInputStream : NSInputStream
+ (instancetype)new NS_UNAVAILABLE;
+ (instancetype)inputStreamWithData:(NSData *)data NS_UNAVAILABLE;
+ (instancetype)inputStreamWithFileAtPath:(NSString *)path NS_UNAVAILABLE;
- (instancetype)init NS_UNAVAILABLE;
- (instancetype)initWithData:(NSData *)data NS_UNAVAILABLE;
- (instancetype)initWithFileAtPath:(NSString *)path NS_UNAVAILABLE;

+ (instancetype)inputStreamWithRequest:(NSURLRequest *)request;
- (instancetype)initWithRequest:(NSURLRequest *)request NS_DESIGNATED_INITIALIZER;
@end

@interface RemoteInputStream () <NSURLSessionDataDelegate>
@property (retain) NSURLSessionDataTask *sessionDataTask;
@property (retain) NSMutableArray<NSData *> *bufferData;
@property (retain, nullable) dispatch_semaphore_t semaphore;
@end

@implementation RemoteInputStream

+ (instancetype)inputStreamWithRequest:(NSURLRequest *)request {
    return [[[self.class alloc] initWithRequest:request] autorelease];
}

- (instancetype)initWithURL:(NSURL *)url {
    NSURLRequest *request = [[NSURLRequest alloc] initWithURL:url];
    self = [self initWithRequest:request];
    [request release];
    return self;
}

- (instancetype)initWithRequest:(NSURLRequest *)request {
    if (self = [super initWithURL:request.URL]) {
        NSURLSession *session = [NSURLSession sessionWithConfiguration:NSURLSessionConfiguration.ephemeralSessionConfiguration];
        NSURLSessionDataTask *sessionDataTask = [session dataTaskWithRequest:request];
        self.sessionDataTask = sessionDataTask;
        
        NSMutableArray<NSData *> *bufferData = [NSMutableArray<NSData *> new];
        self.bufferData = bufferData;
        [bufferData release];
    }
    
    return self;
}

- (void)dealloc {
    [_sessionDataTask cancel];
    [_sessionDataTask release];
    [_bufferData release];
    
    if (_semaphore) {
        dispatch_release(_semaphore);
    }
    
    [super dealloc];
}

- (void)open {
    self.sessionDataTask.delegate = self;
    [self.sessionDataTask resume];
}

- (void)close {
    [self.sessionDataTask suspend];
}

- (NSInteger)read:(uint8_t *)buffer maxLength:(NSUInteger)len {
    objc_sync_enter(self);
    
    if (self.bufferData.count == 0) {
        if (self.sessionDataTask.state == NSURLSessionTaskStateRunning) {
            dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
            self.semaphore = semaphore;
            
            objc_sync_exit(self);
            
            dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
            objc_sync_enter(self);
            
            self.semaphore = nil;
            dispatch_release(semaphore);
            
            if (self.bufferData.count == 0) {
                objc_sync_exit(self);
                return 0;
            }
        } else {
            objc_sync_exit(self);
            return 0;
        }
    }
    
    NSMutableData *result = [NSMutableData new];
    NSUInteger remaining = len;
    
    while (YES) {
        NSAutoreleasePool *pool = [NSAutoreleasePool new];
        
        BOOL shouldBreak;
        
        if (remaining < self.bufferData[0].length) {
            NSData *data1 = [self.bufferData[0] subdataWithRange:NSMakeRange(0, remaining)];
            NSData *data2 = [self.bufferData[0] subdataWithRange:NSMakeRange(remaining, self.bufferData[0].length - remaining)];
            
            [result appendData:data1];
            [self.bufferData replaceObjectAtIndex:0 withObject:data2];
            remaining = 0;
            shouldBreak = YES;
        } else {
            [result appendData:self.bufferData[0]];
            remaining -= self.bufferData[0].length;
            [self.bufferData removeObjectAtIndex:0];
            
            if (self.bufferData.count == 0) {
                shouldBreak = YES;
            } else {
                shouldBreak = NO;
            }
        }
        
        [pool release];
        
        if (remaining == 0) {
            shouldBreak = YES;
        }
        
        if (shouldBreak) {
            break;
        }
    }
    
    objc_sync_exit(self);
    
    NSUInteger length = result.length;
    
    memcpy(buffer, result.bytes, length);
    [result release];
    
    return length;
}

#pragma mark - NSURLSessionDataDelegate

- (void)URLSession:(NSURLSession *)session dataTask:(NSURLSessionDataTask *)dataTask didReceiveData:(NSData *)data {
    objc_sync_enter(self);
    [self.bufferData addObject:data];
    
    if (self.semaphore) {
        dispatch_semaphore_signal(self.semaphore);
    }
    
    objc_sync_exit(self);
}

@end

一个单元测试代码的例子:
#import <XCTest/XCTestCase.h>

@interface RemoteInputStreamTests : XCTestCase
@end

@implementation RemoteInputStreamTests

- (void)test_read {
    NSURL *testURL = [NSURL URLWithString:@"https://fastly.picsum.photos/id/11/2500/1667.jpg?hmac=xxjFJtAPgshYkysU_aqx2sZir-kIOjNR9vx0te7GycQ"];
    NSData *normalData = [NSData dataWithContentsOfURL:testURL];
    
    RemoteInputStream *inputStream = [RemoteInputStream inputStreamWithURL:testURL];
    [inputStream open];
    
    NSUInteger maxLength = 16;
    uint8_t *buffer = malloc(sizeof(uint8_t) * maxLength);
    NSUInteger len = [inputStream read:buffer maxLength:maxLength];
    NSMutableData *streamingData = [NSMutableData new];
    
    while (len) {
        [streamingData appendBytes:buffer length:len];
        len = [inputStream read:buffer maxLength:maxLength];
    }
    
    free(buffer);
    
    XCTAssertTrue([normalData isEqualToData:streamingData]);
    
    [streamingData release];
}

@end

1
我同意关于NSURLSession的观点,但对于NSURLSessionDataTask可能会有所顾虑。我更倾向于使用NSURLSessionDownloadTask(避免将资源加载到内存中),然后从文件进行缓冲流处理。你真的不希望在NSURLSessionDataTask中阻塞。 - Rob
1
我同意关于NSURLSession的观点,但对于NSURLSessionDataTask我可能会有所顾虑。我更倾向于使用NSURLSessionDownloadTask(避免将资源加载到内存中),然后从文件中进行缓冲流处理。你真的不想在NSURLSessionDataTask中阻塞。 - undefined
1
而且,为了支持bdash对NSURLConnection的使用进行辩护,他的回答是在NSURLSession出现之前的。 - Rob

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接